import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.xml.xpath.XPath;
private List<SgtInfo> harvestDetails(final String rawSgtSummary, final WebResource baseWebResource, final List<Header> headers) {
LOG.trace("rawSgtSummary: {}", rawSgtSummary);
- final List<SgtInfo> sgtInfos = new ArrayList<>();
+ final List<Future<SgtInfo>> sgtInfoFutureBag = new ArrayList<>();
+
+ // prepare worker pool
+ final ExecutorService pool = Executors.newFixedThreadPool(
+ 10, new ThreadFactoryBuilder().setNameFormat("ise-sgt-worker-%d").build());
// parse sgtSummary
final XPath xpath = IseReplyUtil.setupXpath();
final String sgtLinkHrefValue = sgtLinkNodes.item(i).getNodeValue();
LOG.debug("found sgt resource [{}]: {}", i, sgtLinkHrefValue);
- // query all sgt entries (serial-vise)
- final URI hrefToSgtDetailUri = URI.create(sgtLinkHrefValue);
- final WebResource.Builder requestBuilder = RestClientFactory.createRequestBuilder(baseWebResource, headers, hrefToSgtDetailUri.getPath());
- final String rawSgtDetail = IseReplyUtil.deliverResponse(requestBuilder);
- LOG.trace("rawSgtDetail: {}", rawSgtDetail);
-
- final Node sgtNode = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_DETAIL, IseReplyUtil.createInputSource(rawSgtDetail),
- XPathConstants.NODE);
- final Node sgtName = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_NAME_ATTR, sgtNode, XPathConstants.NODE);
- final Node sgtValue = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_VALUE, sgtNode, XPathConstants.NODE);
- LOG.debug("sgt value [{}]: {} -> {}", i, sgtValue, sgtName);
-
- // store replies into list of SgtInfo
- final Sgt sgt = new Sgt(Integer.parseInt(sgtValue.getNodeValue(), 10));
- final SgtInfo sgtInfo = new SgtInfo(sgt, sgtName.getNodeValue());
- sgtInfos.add(sgtInfo);
+ // submit all query tasks to pool
+ final int idx = i;
+ sgtInfoFutureBag.add(pool.submit(new Callable<SgtInfo>() {
+ @Override
+ public SgtInfo call() {
+ SgtInfo sgtInfo = null;
+ try {
+ sgtInfo = querySgtDetail(baseWebResource, headers, xpath, idx, sgtLinkHrefValue);
+ } catch (XPathExpressionException e) {
+ LOG.info("failed to parse sgt response for {}: {}", sgtLinkHrefValue, e.getMessage());
+ }
+ return sgtInfo;
+ }
+ }));
+ }
+
+ // stop pool
+ pool.shutdown();
+ final boolean terminated = pool.awaitTermination(1, TimeUnit.MINUTES);
+ if (!terminated) {
+ LOG.debug("NOT all sgt-detail queries succeeded - timed out");
+ pool.shutdownNow();
+ }
+ } catch (InterruptedException | XPathExpressionException e) {
+ LOG.warn("failed to query all-sgt details", e);
+ }
+
+ // harvest available details
+ return sgtInfoFutureBag.stream()
+ .map(this::gainSgtInfoSafely)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private SgtInfo gainSgtInfoSafely(final Future<SgtInfo> response) {
+ SgtInfo result = null;
+ if (response.isDone() && !response.isCancelled()) {
+ try {
+ result = response.get();
+ } catch (Exception e) {
+ LOG.debug("sgt-detail query failed even when future was DONE", e);
}
- } catch (XPathExpressionException e) {
- LOG.warn("failed to parse all-sgt response", e);
}
+ return result;
+ }
- return sgtInfos;
+ private SgtInfo querySgtDetail(final WebResource baseWebResource, final List<Header> headers, final XPath xpath,
+ final int idx, final String sgtLinkHrefValue) throws XPathExpressionException {
+ // query all sgt entries (serial-vise)
+ final URI hrefToSgtDetailUri = URI.create(sgtLinkHrefValue);
+ final WebResource.Builder requestBuilder = RestClientFactory.createRequestBuilder(baseWebResource, headers, hrefToSgtDetailUri.getPath());
+ // time consuming operation - wait for rest response
+ final String rawSgtDetail = IseReplyUtil.deliverResponse(requestBuilder);
+ LOG.trace("rawSgtDetail: {}", rawSgtDetail);
+
+ // process response xml
+ final Node sgtNode = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_DETAIL, IseReplyUtil.createInputSource(rawSgtDetail),
+ XPathConstants.NODE);
+ final Node sgtName = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_NAME_ATTR, sgtNode, XPathConstants.NODE);
+ final Node sgtValue = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_VALUE, sgtNode, XPathConstants.NODE);
+ LOG.debug("sgt value [{}]: {} -> {}", idx, sgtValue, sgtName);
+
+ // store replies into list of SgtInfo
+ final Sgt sgt = new Sgt(Integer.parseInt(sgtValue.getNodeValue(), 10));
+ return new SgtInfo(sgt, sgtName.getNodeValue());
}
}