X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Ftools%2Fnetconf-testtool%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftest%2Ftool%2FExecution.java;h=05957204b73fc78ca07707fd85ddd0458dbe66b5;hb=refs%2Fchanges%2F10%2F95210%2F46;hp=5fc8d9ba9f18ee4c36335ae23950a9183b5f22a3;hpb=b5f366541735dca06d4008845dbcc49bda725190;p=netconf.git diff --git a/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/Execution.java b/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/Execution.java index 5fc8d9ba9f..05957204b7 100644 --- a/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/Execution.java +++ b/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/Execution.java @@ -5,147 +5,132 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - - package org.opendaylight.netconf.test.tool; -import com.ning.http.client.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.google.common.collect.Lists; import java.io.IOException; -import java.util.ArrayList; +import java.net.Authenticator; +import java.net.PasswordAuthentication; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Locale; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class Execution implements Callable { - - private final ArrayList payloads; - private final AsyncHttpClient asyncHttpClient; +final class Execution implements Callable { private static final Logger LOG = LoggerFactory.getLogger(Execution.class); - private final boolean invokeAsync; - private final Semaphore semaphore; - private final int throttle; + private static final String NETCONF_TOPOLOGY_DESTINATION = + "http://%s:%s/restconf/config/network-topology:network-topology/topology/topology-netconf"; - static final class DestToPayload { - - private final String destination; - private final String payload; - - public DestToPayload(String destination, String payload) { - this.destination = destination; - this.payload = payload; - } - - public String getDestination() { - return destination; - } + private final HttpClient httpClient; + private final String destination; + private final List openDevices; + private final TesttoolParameters params; + private final Semaphore semaphore; - public String getPayload() { - return payload; - } - } + private final int throttle; + private final boolean isAsync; + + Execution(final List openDevices, final TesttoolParameters params) { + httpClient = HttpClient.newBuilder() + .authenticator(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(params.controllerAuthUsername, + params.controllerAuthPassword.toCharArray()); + } + }) + .build(); + destination = String.format(Locale.ROOT, NETCONF_TOPOLOGY_DESTINATION, + params.controllerIp, params.controllerPort); + this.openDevices = openDevices; + this.params = params; - public Execution(TesttoolParameters params, ArrayList payloads) { - this.invokeAsync = params.async; - this.throttle = params.throttle / params.threadAmount; + throttle = params.throttle / params.threadAmount; + isAsync = params.async; if (params.async && params.threadAmount > 1) { - LOG.info("Throttling per thread: {}", this.throttle); + LOG.info("Throttling per thread: {}", throttle); } - this.semaphore = new Semaphore(this.throttle); - - this.asyncHttpClient = new AsyncHttpClient(new AsyncHttpClientConfig.Builder() - .setConnectTimeout(Integer.MAX_VALUE) - .setRequestTimeout(Integer.MAX_VALUE) - .setAllowPoolingConnections(true) - .build()); - - this.payloads = new ArrayList<>(); - for (DestToPayload payload : payloads) { - AsyncHttpClient.BoundRequestBuilder requestBuilder = asyncHttpClient.preparePut(payload.getDestination()) - .addHeader("Content-Type", "application/xml") - .addHeader("Accept", "application/xml") - .setBody(payload.getPayload()) - .setRequestTimeout(Integer.MAX_VALUE); + semaphore = new Semaphore(throttle); + } - if (params.auth != null) { - requestBuilder.setRealm(new Realm.RealmBuilder() - .setScheme(Realm.AuthScheme.BASIC) - .setPrincipal(params.auth.get(0)) - .setPassword(params.auth.get(1)) - .setMethodName("PUT") - .setUsePreemptiveAuth(true) - .build()); - } - this.payloads.add(requestBuilder.build()); + @Override + public Void call() { + final List requests = prepareRequests(); + if (isAsync) { + this.sendAsync(requests); + } else { + this.sendSync(requests); } + return null; } - private void invokeSync() { - LOG.info("Begin sending sync requests"); - for (Request request : payloads) { - try { - Response response = asyncHttpClient.executeRequest(request).get(); - if (response.getStatusCode() != 200 && response.getStatusCode() != 204) { - LOG.warn("Status code: {}", response.getStatusCode()); - LOG.warn("url: {}", request.getUrl()); - LOG.warn(response.getResponseBody()); - } - } catch (InterruptedException | ExecutionException | IOException e) { - LOG.warn(e.toString()); - } - } - LOG.info("End sending sync requests"); + private List prepareRequests() { + final List> batches = Lists.partition(openDevices, params.generateConfigBatchSize); + return batches.stream() + .map(b -> PayloadCreator.createStringPayload(b, params)) + .map(this::prepareRequest) + .collect(Collectors.toList()); } - private void invokeAsync() { - final ArrayList> futures = new ArrayList<>(); + private void sendAsync(final List requests) { LOG.info("Begin sending async requests"); - - for (final Request request : payloads) { + for (final HttpRequest request : requests) { try { semaphore.acquire(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Semaphore acquire interrupted"); } - futures.add(asyncHttpClient.executeRequest(request, new AsyncCompletionHandler() { - @Override - public STATE onStatusReceived(HttpResponseStatus status) throws Exception { - super.onStatusReceived(status); - if (status.getStatusCode() != 200 && status.getStatusCode() != 204) { - LOG.warn("Request failed, status code: {}", status.getStatusCode() + status.getStatusText()); - LOG.warn("request: {}", request.toString()); - } - return STATE.CONTINUE; - } - - @Override - public Response onCompleted(Response response) throws Exception { - semaphore.release(); - return response; + httpClient.sendAsync(request, BodyHandlers.ofString()).whenComplete((response, error) -> { + if (response.statusCode() != 200) { + LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}", + response.statusCode(), request.uri(), response.body()); } - })); + semaphore.release(); + }); } LOG.info("Requests sent, waiting for responses"); - try { semaphore.acquire(this.throttle); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Semaphore acquire interrupted"); } - LOG.info("Responses received, ending..."); } - @Override - public Void call() throws Exception { - if (invokeAsync) { - this.invokeAsync(); - } else { - this.invokeSync(); + private void sendSync(final List requests) { + LOG.info("Begin sending sync requests"); + for (final HttpRequest request : requests) { + try { + final HttpResponse response = httpClient.send(request, BodyHandlers.ofString()); + if (response.statusCode() != 200) { + LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}", + response.statusCode(), request.uri(), response.body()); + } + } catch (final InterruptedException | IOException e) { + LOG.error("Failed to execute request: {}", request, e); + throw new RuntimeException("Failed to execute request", e); + } } - return null; + LOG.info("End sending sync requests"); + } + + private HttpRequest prepareRequest(final String payload) { + LOG.info("Creating request to: {} with payload: {}", destination, payload); + return HttpRequest.newBuilder(URI.create(destination)) + .method("PATCH", BodyPublishers.ofString(payload, StandardCharsets.UTF_8)) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .build(); } }