import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
}
}
- final void storeStaleRoutes(final Set<TablesKey> gracefulTables) {
+ void storeStaleRoutes(final Set<TablesKey> gracefulTables) {
+ final CountDownLatch latch = new CountDownLatch(gracefulTables.size());
+
try (final DOMDataReadOnlyTransaction tx = this.chain.getDomChain().newReadOnlyTransaction()) {
- final Map<TablesKey, Future> readFutures = new HashMap<>();
- final ExecutorService threadPool = Executors.newCachedThreadPool();
- gracefulTables.forEach(tablesKey -> {
+ for (TablesKey tablesKey : gracefulTables) {
final TableContext ctx = this.tables.get(tablesKey);
if (ctx == null) {
LOG.warn("Missing table for address family {}", tablesKey);
- return;
+ latch.countDown();
+ continue;
}
- final YangInstanceIdentifier iid = ctx.routesPath();
- final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture =
- tx.read(LogicalDatastoreType.OPERATIONAL, iid);
- readFutures.put(tablesKey, readFuture);
- Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+ Futures.addCallback(tx.read(LogicalDatastoreType.OPERATIONAL, ctx.routesPath()),
+ new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
@Override
public void onSuccess(final Optional<NormalizedNode<?, ?>> routesOptional) {
- synchronized (AdjRibInWriter.this.staleRoutesRegistry) {
+ try {
if (routesOptional.isPresent()) {
- final MapNode routesNode = (MapNode) routesOptional.get();
- final List<NodeIdentifierWithPredicates> routes = routesNode.getValue().stream()
- .map(MapEntryNode::getIdentifier)
- .collect(Collectors.toList());
- if (!routes.isEmpty()) {
- AdjRibInWriter.this.staleRoutesRegistry.put(tablesKey, routes);
+ synchronized (AdjRibInWriter.this.staleRoutesRegistry) {
+ final MapNode routesNode = (MapNode) routesOptional.get();
+ final List<NodeIdentifierWithPredicates> routes = routesNode.getValue().stream()
+ .map(MapEntryNode::getIdentifier)
+ .collect(Collectors.toList());
+ if (!routes.isEmpty()) {
+ AdjRibInWriter.this.staleRoutesRegistry.put(tablesKey, routes);
+ }
}
}
+ } finally {
+ latch.countDown();
}
}
@Override
public void onFailure(final Throwable throwable) {
LOG.warn("Failed to store stale routes for table {}", tablesKey, throwable);
+ latch.countDown();
}
- }, threadPool);
- });
- readFutures.entrySet().stream().forEach(entry -> {
- try {
- entry.getValue().get();
- } catch (final InterruptedException | ExecutionException e) {
- LOG.warn("Failed to store stale routes for table {}", entry.getKey(), e);
- }
- });
+ }, MoreExecutors.directExecutor());
+ }
+ }
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting to store stale routes with {} tasks of {} to finish", latch.getCount(),
+ gracefulTables, e);
}
}
- final void removeStaleRoutes(final TablesKey tableKey) {
+ void removeStaleRoutes(final TablesKey tableKey) {
final TableContext ctx = this.tables.get(tableKey);
if (ctx == null) {
LOG.debug("No table for {}, not removing any stale routes", tableKey);
}, MoreExecutors.directExecutor());
}
- final FluentFuture<? extends CommitInfo> clearTables(final Set<TablesKey> tablesToClear) {
+ FluentFuture<? extends CommitInfo> clearTables(final Set<TablesKey> tablesToClear) {
if (tablesToClear == null || tablesToClear.isEmpty()) {
return CommitInfo.emptyFluentFuture();
}