BUG-6858: adapt to ise api, simultaneous queries
[groupbasedpolicy.git] / sxp-integration / sxp-ise-adapter / src / main / java / org / opendaylight / groupbasedpolicy / sxp_ise_adapter / impl / GbpIseSgtHarvesterImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.sxp_ise_adapter.impl;
10
11 import com.google.common.base.Function;
12 import com.google.common.util.concurrent.AsyncFunction;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import com.sun.jersey.api.client.Client;
17 import com.sun.jersey.api.client.WebResource;
18 import java.net.URI;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.TimeUnit;
27 import java.util.stream.Collectors;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import javax.xml.xpath.XPath;
31 import javax.xml.xpath.XPathConstants;
32 import javax.xml.xpath.XPathExpressionException;
33 import org.opendaylight.groupbasedpolicy.sxp_ise_adapter.impl.util.IseReplyUtil;
34 import org.opendaylight.groupbasedpolicy.sxp_ise_adapter.impl.util.RestClientFactory;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.groupbasedpolicy.sxp.integration.sxp.ise.adapter.model.rev160630.gbp.sxp.ise.adapter.IseSourceConfig;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.groupbasedpolicy.sxp.integration.sxp.ise.adapter.model.rev160630.gbp.sxp.ise.adapter.ise.source.config.ConnectionConfig;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.groupbasedpolicy.sxp.integration.sxp.ise.adapter.model.rev160630.gbp.sxp.ise.adapter.ise.source.config.connection.config.Header;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.sxp.database.rev160308.Sgt;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.w3c.dom.Node;
42 import org.w3c.dom.NodeList;
43 import org.xml.sax.InputSource;
44
45 /**
46  * Purpose: harvest sgt + names available via ise-rest-api
47  */
48 public class GbpIseSgtHarvesterImpl implements GbpIseSgtHarvester {
49
50     private static final Logger LOG = LoggerFactory.getLogger(GbpIseSgtHarvesterImpl.class);
51
52     private final SgtInfoProcessor[] sgtInfoProcessors;
53
54     /**
55      * @param sgtInfoProcessors generator delegate
56      */
57     public GbpIseSgtHarvesterImpl(final SgtInfoProcessor... sgtInfoProcessors) {
58         this.sgtInfoProcessors = sgtInfoProcessors;
59     }
60
61     @Override
62     public ListenableFuture<Integer> harvest(@Nonnull final IseSourceConfig configuration) {
63         final ConnectionConfig connectionConfig = configuration.getConnectionConfig();
64         ListenableFuture<Integer> result;
65         try {
66             final Client iseClient = RestClientFactory.createIseClient(connectionConfig);
67             final WebResource baseWebResource = iseClient.resource(connectionConfig.getIseRestUrl().getValue());
68
69             final WebResource.Builder requestBuilder = RestClientFactory.createRequestBuilder(baseWebResource,
70                     connectionConfig.getHeader(), RestClientFactory.PATH_ERS_CONFIG_SGT);
71             final String rawSgtSummary = IseReplyUtil.deliverResponse(requestBuilder);
72
73             final List<SgtInfo> sgtInfos = harvestDetails(rawSgtSummary, baseWebResource, connectionConfig.getHeader());
74
75             ListenableFuture<Void> processingResult = Futures.immediateCheckedFuture(null);
76             for (SgtInfoProcessor processor : sgtInfoProcessors) {
77                 processingResult = Futures.transform(processingResult, new AsyncFunction<Void, Void>() {
78                     @Override
79                     public ListenableFuture<Void> apply(final Void input) throws Exception {
80                         LOG.debug("entering stg-info processor {}", processor.getClass().getSimpleName());
81                         return processor.processSgtInfo(configuration.getTenant(), sgtInfos);
82                     }
83                 });
84             }
85             result = Futures.transform(processingResult, new Function<Void, Integer>() {
86                 @Nullable
87                 @Override
88                 public Integer apply(@Nullable final Void input) {
89                     // always success, otherwise there will be TransactionCommitFailedException thrown
90                     return sgtInfos.size();
91                 }
92             });
93         } catch (Exception e) {
94             LOG.debug("failed to harvest ise", e);
95             result = Futures.immediateFailedFuture(e);
96         }
97
98         return result;
99     }
100
101     private List<SgtInfo> harvestDetails(final String rawSgtSummary, final WebResource baseWebResource, final List<Header> headers) {
102         LOG.trace("rawSgtSummary: {}", rawSgtSummary);
103         final List<Future<SgtInfo>> sgtInfoFutureBag = new ArrayList<>();
104
105         // prepare worker pool
106         final ExecutorService pool = Executors.newFixedThreadPool(
107                 10, new ThreadFactoryBuilder().setNameFormat("ise-sgt-worker-%d").build());
108
109         // parse sgtSummary
110         final XPath xpath = IseReplyUtil.setupXpath();
111
112         InputSource inputSource = IseReplyUtil.createInputSource(rawSgtSummary);
113         try {
114             final NodeList sgtLinkNodes = (NodeList) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_ALL_LINK_HREFS, inputSource,
115                     XPathConstants.NODESET);
116             for (int i = 0; i < sgtLinkNodes.getLength(); i++) {
117                 final String sgtLinkHrefValue = sgtLinkNodes.item(i).getNodeValue();
118                 LOG.debug("found sgt resource [{}]: {}", i, sgtLinkHrefValue);
119
120                 // submit all query tasks to pool
121                 final int idx = i;
122                 sgtInfoFutureBag.add(pool.submit(new Callable<SgtInfo>() {
123                     @Override
124                     public SgtInfo call() {
125                         SgtInfo sgtInfo = null;
126                         try {
127                             sgtInfo = querySgtDetail(baseWebResource, headers, xpath, idx, sgtLinkHrefValue);
128                         } catch (XPathExpressionException e) {
129                             LOG.info("failed to parse sgt response for {}: {}", sgtLinkHrefValue, e.getMessage());
130                         }
131                         return sgtInfo;
132                     }
133                 }));
134             }
135
136             // stop pool
137             pool.shutdown();
138             final boolean terminated = pool.awaitTermination(1, TimeUnit.MINUTES);
139             if (!terminated) {
140                 LOG.debug("NOT all sgt-detail queries succeeded - timed out");
141                 pool.shutdownNow();
142             }
143         } catch (InterruptedException | XPathExpressionException e) {
144             LOG.warn("failed to query all-sgt details", e);
145         }
146
147         // harvest available details
148         return sgtInfoFutureBag.stream()
149                 .map(this::gainSgtInfoSafely)
150                 .filter(Objects::nonNull)
151                 .collect(Collectors.toList());
152     }
153
154     private SgtInfo gainSgtInfoSafely(final Future<SgtInfo> response) {
155         SgtInfo result = null;
156         if (response.isDone() && !response.isCancelled()) {
157             try {
158                 result = response.get();
159             } catch (Exception e) {
160                 LOG.debug("sgt-detail query failed even when future was DONE", e);
161             }
162         }
163         return result;
164     }
165
166     private SgtInfo querySgtDetail(final WebResource baseWebResource, final List<Header> headers, final XPath xpath,
167                                    final int idx, final String sgtLinkHrefValue) throws XPathExpressionException {
168         // query all sgt entries (serial-vise)
169         final URI hrefToSgtDetailUri = URI.create(sgtLinkHrefValue);
170         final WebResource.Builder requestBuilder = RestClientFactory.createRequestBuilder(baseWebResource, headers, hrefToSgtDetailUri.getPath());
171         // time consuming operation - wait for rest response
172         final String rawSgtDetail = IseReplyUtil.deliverResponse(requestBuilder);
173         LOG.trace("rawSgtDetail: {}", rawSgtDetail);
174
175         // process response xml
176         final Node sgtNode = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_DETAIL, IseReplyUtil.createInputSource(rawSgtDetail),
177                 XPathConstants.NODE);
178         final Node sgtName = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_NAME_ATTR, sgtNode, XPathConstants.NODE);
179         final Node sgtValue = (Node) xpath.evaluate(IseReplyUtil.EXPRESSION_SGT_VALUE, sgtNode, XPathConstants.NODE);
180         LOG.debug("sgt value [{}]: {} -> {}", idx, sgtValue, sgtName);
181
182         // store replies into list of SgtInfo
183         final Sgt sgt = new Sgt(Integer.parseInt(sgtValue.getNodeValue(), 10));
184         return new SgtInfo(sgt, sgtName.getNodeValue());
185     }
186
187 }