BUG-6858: adapt to ise api, simultaneous queries 11/46611/1
authorMichal Rehak <mirehak@cisco.com>
Wed, 28 Sep 2016 15:55:47 +0000 (17:55 +0200)
committerMichal Rehak <mirehak@cisco.com>
Thu, 6 Oct 2016 12:28:05 +0000 (14:28 +0200)
    - use pool to query all sgt details

Change-Id: I3c19477f09a0720c9c8054db3abb4ad2232730d2
Signed-off-by: Michal Rehak <mirehak@cisco.com>
(cherry picked from commit 91afd65cfa15ac78b3243d7e9e9dda85a3ac1850)

sxp-integration/sxp-ise-adapter/src/main/java/org/opendaylight/groupbasedpolicy/sxp_ise_adapter/impl/GbpIseSgtHarvesterImpl.java

index 41cba2975a48503b5eaa56b1de5b7778b01501ff..4b610ca9d1feac21df0949ef3cb9b5bec9f29a15 100644 (file)
@@ -12,11 +12,19 @@ import com.google.common.base.Function;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.WebResource;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.xml.xpath.XPath;
@@ -92,7 +100,11 @@ public class GbpIseSgtHarvesterImpl implements GbpIseSgtHarvester {
 
     private List<SgtInfo> harvestDetails(final String rawSgtSummary, final WebResource baseWebResource, final List<Header> headers) {
         LOG.trace("rawSgtSummary: {}", rawSgtSummary);
-        final List<SgtInfo> sgtInfos = new ArrayList<>();
+        final List<Future<SgtInfo>> sgtInfoFutureBag = new ArrayList<>();
+
+        // prepare worker pool
+        final ExecutorService pool = Executors.newFixedThreadPool(
+                10, new ThreadFactoryBuilder().setNameFormat("ise-sgt-worker-%d").build());
 
         // parse sgtSummary
         final XPath xpath = IseReplyUtil.setupXpath();
@@ -105,28 +117,71 @@ public class GbpIseSgtHarvesterImpl implements GbpIseSgtHarvester {
                 final String sgtLinkHrefValue = sgtLinkNodes.item(i).getNodeValue();
                 LOG.debug("found sgt resource [{}]: {}", i, sgtLinkHrefValue);
 
-                // query all sgt entries (serial-vise)
-                final URI hrefToSgtDetailUri = URI.create(sgtLinkHrefValue);
-                final WebResource.Builder requestBuilder = RestClientFactory.createRequestBuilder(baseWebResource, headers, hrefToSgtDetailUri.getPath());
-                final String rawSgtDetail = IseReplyUtil.deliverResponse(requestBuilder);
-                LOG.trace("rawSgtDetail: {}", rawSgtDetail);
-
-                final Node sgtNode = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_DETAIL, IseReplyUtil.createInputSource(rawSgtDetail),
-                        XPathConstants.NODE);
-                final Node sgtName = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_NAME_ATTR, sgtNode, XPathConstants.NODE);
-                final Node sgtValue = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_VALUE, sgtNode, XPathConstants.NODE);
-                LOG.debug("sgt value [{}]: {} -> {}", i, sgtValue, sgtName);
-
-                // store replies into list of SgtInfo
-                final Sgt sgt = new Sgt(Integer.parseInt(sgtValue.getNodeValue(), 10));
-                final SgtInfo sgtInfo = new SgtInfo(sgt, sgtName.getNodeValue());
-                sgtInfos.add(sgtInfo);
+                // submit all query tasks to pool
+                final int idx = i;
+                sgtInfoFutureBag.add(pool.submit(new Callable<SgtInfo>() {
+                    @Override
+                    public SgtInfo call() {
+                        SgtInfo sgtInfo = null;
+                        try {
+                            sgtInfo = querySgtDetail(baseWebResource, headers, xpath, idx, sgtLinkHrefValue);
+                        } catch (XPathExpressionException e) {
+                            LOG.info("failed to parse sgt response for {}: {}", sgtLinkHrefValue, e.getMessage());
+                        }
+                        return sgtInfo;
+                    }
+                }));
+            }
+
+            // stop pool
+            pool.shutdown();
+            final boolean terminated = pool.awaitTermination(1, TimeUnit.MINUTES);
+            if (!terminated) {
+                LOG.debug("NOT all sgt-detail queries succeeded - timed out");
+                pool.shutdownNow();
+            }
+        } catch (InterruptedException | XPathExpressionException e) {
+            LOG.warn("failed to query all-sgt details", e);
+        }
+
+        // harvest available details
+        return sgtInfoFutureBag.stream()
+                .map(this::gainSgtInfoSafely)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+    }
+
+    private SgtInfo gainSgtInfoSafely(final Future<SgtInfo> response) {
+        SgtInfo result = null;
+        if (response.isDone() && !response.isCancelled()) {
+            try {
+                result = response.get();
+            } catch (Exception e) {
+                LOG.debug("sgt-detail query failed even when future was DONE", e);
             }
-        } catch (XPathExpressionException e) {
-            LOG.warn("failed to parse all-sgt response", e);
         }
+        return result;
+    }
 
-        return sgtInfos;
+    private SgtInfo querySgtDetail(final WebResource baseWebResource, final List<Header> headers, final XPath xpath,
+                                   final int idx, final String sgtLinkHrefValue) throws XPathExpressionException {
+        // query all sgt entries (serial-vise)
+        final URI hrefToSgtDetailUri = URI.create(sgtLinkHrefValue);
+        final WebResource.Builder requestBuilder = RestClientFactory.createRequestBuilder(baseWebResource, headers, hrefToSgtDetailUri.getPath());
+        // time consuming operation - wait for rest response
+        final String rawSgtDetail = IseReplyUtil.deliverResponse(requestBuilder);
+        LOG.trace("rawSgtDetail: {}", rawSgtDetail);
+
+        // process response xml
+        final Node sgtNode = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_DETAIL, IseReplyUtil.createInputSource(rawSgtDetail),
+                XPathConstants.NODE);
+        final Node sgtName = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_NAME_ATTR, sgtNode, XPathConstants.NODE);
+        final Node sgtValue = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_VALUE, sgtNode, XPathConstants.NODE);
+        LOG.debug("sgt value [{}]: {} -> {}", idx, sgtValue, sgtName);
+
+        // store replies into list of SgtInfo
+        final Sgt sgt = new Sgt(Integer.parseInt(sgtValue.getNodeValue(), 10));
+        return new SgtInfo(sgt, sgtName.getNodeValue());
     }
 
 }