X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Ftools%2Fnetconf-testtool%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftest%2Ftool%2FExecution.java;h=05957204b73fc78ca07707fd85ddd0458dbe66b5;hb=13df03aa3e4ea0b1bb82020e219227f76bb88104;hp=d4e6683278d10178e5613fe1a81edc273a6c0b1e;hpb=47c1b8e3d9835d336c79d6b4ca4e61417a05039e;p=netconf.git diff --git a/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/Execution.java b/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/Execution.java index d4e6683278..05957204b7 100644 --- a/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/Execution.java +++ b/netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/Execution.java @@ -5,162 +5,132 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - - package org.opendaylight.netconf.test.tool; -import com.ning.http.client.AsyncCompletionHandler; -import com.ning.http.client.AsyncHttpClient; -import com.ning.http.client.AsyncHttpClientConfig; -import com.ning.http.client.HttpResponseStatus; -import com.ning.http.client.Realm; -import com.ning.http.client.Request; -import com.ning.http.client.Response; +import com.google.common.collect.Lists; import java.io.IOException; -import java.util.ArrayList; +import java.net.Authenticator; +import java.net.PasswordAuthentication; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Locale; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Execution implements Callable { - - private final ArrayList payloads; - private final AsyncHttpClient asyncHttpClient; +final class Execution implements Callable { private static final Logger LOG = LoggerFactory.getLogger(Execution.class); - private final boolean invokeAsync; - private final Semaphore semaphore; - private final int throttle; - - static final class DestToPayload { + private static final String NETCONF_TOPOLOGY_DESTINATION = + "http://%s:%s/restconf/config/network-topology:network-topology/topology/topology-netconf"; - private final String destination; - private final String payload; - - DestToPayload(String destination, String payload) { - this.destination = destination; - this.payload = payload; - } - - public String getDestination() { - return destination; - } + private final HttpClient httpClient; + private final String destination; + private final List openDevices; + private final TesttoolParameters params; + private final Semaphore semaphore; - public String getPayload() { - return payload; - } - } + private final int throttle; + private final boolean isAsync; + + Execution(final List openDevices, final TesttoolParameters params) { + httpClient = HttpClient.newBuilder() + .authenticator(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(params.controllerAuthUsername, + params.controllerAuthPassword.toCharArray()); + } + }) + .build(); + destination = String.format(Locale.ROOT, NETCONF_TOPOLOGY_DESTINATION, + params.controllerIp, params.controllerPort); + this.openDevices = openDevices; + this.params = params; - public Execution(TesttoolParameters params, ArrayList payloads) { - this.invokeAsync = params.async; - this.throttle = params.throttle / params.threadAmount; + throttle = params.throttle / params.threadAmount; + isAsync = params.async; if (params.async && params.threadAmount > 1) { - LOG.info("Throttling per thread: {}", this.throttle); + LOG.info("Throttling per thread: {}", throttle); } - this.semaphore = new Semaphore(this.throttle); - - this.asyncHttpClient = new AsyncHttpClient(new AsyncHttpClientConfig.Builder() - .setConnectTimeout(Integer.MAX_VALUE) - .setRequestTimeout(Integer.MAX_VALUE) - .setAllowPoolingConnections(true) - .build()); - - this.payloads = new ArrayList<>(); - for (DestToPayload payload : payloads) { - AsyncHttpClient.BoundRequestBuilder requestBuilder = asyncHttpClient.preparePost(payload.getDestination()) - .addHeader("Content-Type", "application/json") - .addHeader("Accept", "application/json") - .setBody(payload.getPayload()) - .setRequestTimeout(Integer.MAX_VALUE); + semaphore = new Semaphore(throttle); + } - if (params.auth != null) { - requestBuilder.setRealm(new Realm.RealmBuilder() - .setScheme(Realm.AuthScheme.BASIC) - .setPrincipal(params.auth.get(0)) - .setPassword(params.auth.get(1)) - .setMethodName("POST") - .setUsePreemptiveAuth(true) - .build()); - } - this.payloads.add(requestBuilder.build()); + @Override + public Void call() { + final List requests = prepareRequests(); + if (isAsync) { + this.sendAsync(requests); + } else { + this.sendSync(requests); } + return null; } - private void invokeSync() { - LOG.info("Begin sending sync requests"); - for (Request request : payloads) { - try { - Response response = asyncHttpClient.executeRequest(request).get(); - if (response.getStatusCode() != 200 && response.getStatusCode() != 204) { - if (response.getStatusCode() == 409) { - LOG.warn("Request failed, status code: {} - one or more of the devices" - + " is already configured, skipping the whole batch", response.getStatusCode()); - } else { - LOG.warn("Status code: {}", response.getStatusCode()); - LOG.warn("url: {}", request.getUrl()); - LOG.warn(response.getResponseBody()); - } - } - } catch (InterruptedException | ExecutionException | IOException e) { - LOG.warn(e.toString()); - } - } - LOG.info("End sending sync requests"); + private List prepareRequests() { + final List> batches = Lists.partition(openDevices, params.generateConfigBatchSize); + return batches.stream() + .map(b -> PayloadCreator.createStringPayload(b, params)) + .map(this::prepareRequest) + .collect(Collectors.toList()); } - private void invokeAsync() { + private void sendAsync(final List requests) { LOG.info("Begin sending async requests"); - - for (final Request request : payloads) { + for (final HttpRequest request : requests) { try { semaphore.acquire(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Semaphore acquire interrupted"); } - asyncHttpClient.executeRequest(request, new AsyncCompletionHandler() { - @Override - public STATE onStatusReceived(HttpResponseStatus status) throws Exception { - super.onStatusReceived(status); - if (status.getStatusCode() != 200 && status.getStatusCode() != 204) { - if (status.getStatusCode() == 409) { - LOG.warn("Request failed, status code: {} - one or more of the devices" - + " is already configured, skipping the whole batch", status.getStatusCode()); - } else { - LOG.warn("Request failed, status code: {}", - status.getStatusCode() + status.getStatusText()); - LOG.warn("request: {}", request.toString()); - } - } - return STATE.CONTINUE; - } - - @Override - public Response onCompleted(Response response) throws Exception { - semaphore.release(); - return response; + httpClient.sendAsync(request, BodyHandlers.ofString()).whenComplete((response, error) -> { + if (response.statusCode() != 200) { + LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}", + response.statusCode(), request.uri(), response.body()); } + semaphore.release(); }); } LOG.info("Requests sent, waiting for responses"); - try { semaphore.acquire(this.throttle); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Semaphore acquire interrupted"); } - LOG.info("Responses received, ending..."); } - @Override - public Void call() throws Exception { - if (invokeAsync) { - this.invokeAsync(); - } else { - this.invokeSync(); + private void sendSync(final List requests) { + LOG.info("Begin sending sync requests"); + for (final HttpRequest request : requests) { + try { + final HttpResponse response = httpClient.send(request, BodyHandlers.ofString()); + if (response.statusCode() != 200) { + LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}", + response.statusCode(), request.uri(), response.body()); + } + } catch (final InterruptedException | IOException e) { + LOG.error("Failed to execute request: {}", request, e); + throw new RuntimeException("Failed to execute request", e); + } } - return null; + LOG.info("End sending sync requests"); + } + + private HttpRequest prepareRequest(final String payload) { + LOG.info("Creating request to: {} with payload: {}", destination, payload); + return HttpRequest.newBuilder(URI.create(destination)) + .method("PATCH", BodyPublishers.ofString(payload, StandardCharsets.UTF_8)) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .build(); } }