Remove ThreadPool in connection release path 63/78463/9
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 5 Dec 2018 15:13:19 +0000 (16:13 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 12 Dec 2018 10:52:55 +0000 (11:52 +0100)
This is a simple read operation, there is no need to allocate
a threadpool to handle the read requests.

Change-Id: I2d482e052a8f321d69f621065a8e11948234a1d8
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AdjRibInWriter.java

index 85bbc5c0039466278c5a7b7549dd561e44fae521..da8352846b125b2ecd480341b6775c55f55441b2 100644 (file)
@@ -13,7 +13,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -26,17 +25,14 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
@@ -353,53 +349,57 @@ final class AdjRibInWriter {
         }
     }
 
-    final void storeStaleRoutes(final Set<TablesKey> gracefulTables) {
+    void storeStaleRoutes(final Set<TablesKey> gracefulTables) {
+        final CountDownLatch latch = new CountDownLatch(gracefulTables.size());
+
         try (final DOMDataReadOnlyTransaction tx = this.chain.getDomChain().newReadOnlyTransaction()) {
-            final Map<TablesKey, Future> readFutures = new HashMap<>();
-            final ExecutorService threadPool = Executors.newCachedThreadPool();
-            gracefulTables.forEach(tablesKey -> {
+            for (TablesKey tablesKey : gracefulTables) {
                 final TableContext ctx = this.tables.get(tablesKey);
                 if (ctx == null) {
                     LOG.warn("Missing table for address family {}", tablesKey);
-                    return;
+                    latch.countDown();
+                    continue;
                 }
-                final YangInstanceIdentifier iid = ctx.routesPath();
-                final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture =
-                        tx.read(LogicalDatastoreType.OPERATIONAL, iid);
-                readFutures.put(tablesKey, readFuture);
-                Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+                Futures.addCallback(tx.read(LogicalDatastoreType.OPERATIONAL, ctx.routesPath()),
+                    new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
                     @Override
                     public void onSuccess(final Optional<NormalizedNode<?, ?>> routesOptional) {
-                        synchronized (AdjRibInWriter.this.staleRoutesRegistry) {
+                        try {
                             if (routesOptional.isPresent()) {
-                                final MapNode routesNode = (MapNode) routesOptional.get();
-                                final List<NodeIdentifierWithPredicates> routes = routesNode.getValue().stream()
-                                        .map(MapEntryNode::getIdentifier)
-                                        .collect(Collectors.toList());
-                                if (!routes.isEmpty()) {
-                                    AdjRibInWriter.this.staleRoutesRegistry.put(tablesKey, routes);
+                                synchronized (AdjRibInWriter.this.staleRoutesRegistry) {
+                                    final MapNode routesNode = (MapNode) routesOptional.get();
+                                    final List<NodeIdentifierWithPredicates> routes = routesNode.getValue().stream()
+                                            .map(MapEntryNode::getIdentifier)
+                                            .collect(Collectors.toList());
+                                    if (!routes.isEmpty()) {
+                                        AdjRibInWriter.this.staleRoutesRegistry.put(tablesKey, routes);
+                                    }
                                 }
                             }
+                        } finally {
+                            latch.countDown();
                         }
                     }
 
                     @Override
                     public void onFailure(final Throwable throwable) {
                         LOG.warn("Failed to store stale routes for table {}", tablesKey, throwable);
+                        latch.countDown();
                     }
-                }, threadPool);
-            });
-            readFutures.entrySet().stream().forEach(entry -> {
-                try {
-                    entry.getValue().get();
-                } catch (final InterruptedException | ExecutionException e) {
-                    LOG.warn("Failed to store stale routes for table {}", entry.getKey(), e);
-                }
-            });
+                }, MoreExecutors.directExecutor());
+            }
+        }
+
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting to store stale routes with {} tasks of {} to finish", latch.getCount(),
+                gracefulTables, e);
         }
     }
 
-    final void removeStaleRoutes(final TablesKey tableKey) {
+    void removeStaleRoutes(final TablesKey tableKey) {
         final TableContext ctx = this.tables.get(tableKey);
         if (ctx == null) {
             LOG.debug("No table for {}, not removing any stale routes", tableKey);
@@ -433,7 +433,7 @@ final class AdjRibInWriter {
         }, MoreExecutors.directExecutor());
     }
 
-    final FluentFuture<? extends CommitInfo> clearTables(final Set<TablesKey> tablesToClear) {
+    FluentFuture<? extends CommitInfo> clearTables(final Set<TablesKey> tablesToClear) {
         if (tablesToClear == null || tablesToClear.isEmpty()) {
             return CommitInfo.emptyFluentFuture();
         }