*/
package org.opendaylight.netconf.test.tool;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Execution implements Callable<Void> {
+final class Execution implements Callable<Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
+ private static final String NETCONF_TOPOLOGY_DESTINATION =
+ "http://%s:%s/restconf/config/network-topology:network-topology/topology/topology-netconf";
- private final ArrayList<HttpRequest> payloads;
private final HttpClient httpClient;
- private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
- private final boolean invokeAsync;
+ private final String destination;
+ private final List<Integer> openDevices;
+ private final TesttoolParameters params;
private final Semaphore semaphore;
- private final int throttle;
-
- static final class DestToPayload {
-
- private final String destination;
- private final String payload;
-
- DestToPayload(final String destination, final String payload) {
- this.destination = destination;
- this.payload = payload;
- }
-
- public String getDestination() {
- return destination;
- }
-
- public String getPayload() {
- return payload;
- }
- }
- public Execution(final TesttoolParameters params, final ArrayList<DestToPayload> payloads) {
- this.invokeAsync = params.async;
- this.throttle = params.throttle / params.threadAmount;
-
- if (params.async && params.threadAmount > 1) {
- LOG.info("Throttling per thread: {}", this.throttle);
- }
- this.semaphore = new Semaphore(this.throttle);
+ private final int throttle;
+ private final boolean isAsync;
- this.httpClient = HttpClient.newBuilder()
+ Execution(final List<Integer> openDevices, final TesttoolParameters params) {
+ httpClient = HttpClient.newBuilder()
.authenticator(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
}
})
.build();
- this.payloads = new ArrayList<>();
- for (DestToPayload payload : payloads) {
- HttpRequest request = HttpRequest.newBuilder(URI.create(payload.getDestination()))
- .POST(BodyPublishers.ofString(payload.getPayload(), StandardCharsets.UTF_8))
- .header("Content-Type", "application/json")
- .header("Accept", "application/json")
- .build();
+ destination = String.format(Locale.ROOT, NETCONF_TOPOLOGY_DESTINATION,
+ params.controllerIp, params.controllerPort);
+ this.openDevices = openDevices;
+ this.params = params;
+
+ throttle = params.throttle / params.threadAmount;
+ isAsync = params.async;
- this.payloads.add(request);
+ if (params.async && params.threadAmount > 1) {
+ LOG.info("Throttling per thread: {}", throttle);
}
+ semaphore = new Semaphore(throttle);
}
- private void invokeSync() {
- LOG.info("Begin sending sync requests");
- for (HttpRequest request : payloads) {
- try {
- HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
-
- if (response.statusCode() != 200 && response.statusCode() != 204) {
- if (response.statusCode() == 409) {
- LOG.warn("Request failed, status code: {} - one or more of the devices"
- + " is already configured, skipping the whole batch", response.statusCode());
- } else {
- LOG.warn("Status code: {}", response.statusCode());
- LOG.warn("url: {}", request.uri());
- LOG.warn("body: {}", response.body());
- }
- }
- } catch (InterruptedException | IOException e) {
- LOG.warn("Failed to execute request", e);
- }
+ @Override
+ public Void call() {
+ final List<HttpRequest> requests = prepareRequests();
+ if (isAsync) {
+ this.sendAsync(requests);
+ } else {
+ this.sendSync(requests);
}
- LOG.info("End sending sync requests");
+ return null;
}
- private void invokeAsync() {
- LOG.info("Begin sending async requests");
+ private List<HttpRequest> prepareRequests() {
+ final List<List<Integer>> batches = Lists.partition(openDevices, params.generateConfigBatchSize);
+ return batches.stream()
+ .map(b -> PayloadCreator.createStringPayload(b, params))
+ .map(this::prepareRequest)
+ .collect(Collectors.toList());
+ }
- for (final HttpRequest request : payloads) {
+ private void sendAsync(final List<HttpRequest> requests) {
+ LOG.info("Begin sending async requests");
+ for (final HttpRequest request : requests) {
try {
semaphore.acquire();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.warn("Semaphore acquire interrupted");
}
httpClient.sendAsync(request, BodyHandlers.ofString()).whenComplete((response, error) -> {
- if (response.statusCode() != 200 && response.statusCode() != 204) {
- if (response.statusCode() == 409) {
- LOG.warn("Request failed, status code: {} - one or more of the devices"
- + " is already configured, skipping the whole batch", response.statusCode());
- } else {
- LOG.warn("Request failed, status code: {}", response.statusCode());
- LOG.warn("request: {}", request);
- }
+ if (response.statusCode() != 200) {
+ LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
+ response.statusCode(), request.uri(), response.body());
}
semaphore.release();
});
}
LOG.info("Requests sent, waiting for responses");
-
try {
semaphore.acquire(this.throttle);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.warn("Semaphore acquire interrupted");
}
-
LOG.info("Responses received, ending...");
}
- @Override
- public Void call() {
- if (invokeAsync) {
- this.invokeAsync();
- } else {
- this.invokeSync();
+ private void sendSync(final List<HttpRequest> requests) {
+ LOG.info("Begin sending sync requests");
+ for (final HttpRequest request : requests) {
+ try {
+ final HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
+ if (response.statusCode() != 200) {
+ LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
+ response.statusCode(), request.uri(), response.body());
+ }
+ } catch (final InterruptedException | IOException e) {
+ LOG.error("Failed to execute request: {}", request, e);
+ throw new RuntimeException("Failed to execute request", e);
+ }
}
- return null;
+ LOG.info("End sending sync requests");
+ }
+
+ private HttpRequest prepareRequest(final String payload) {
+ LOG.info("Creating request to: {} with payload: {}", destination, payload);
+ return HttpRequest.newBuilder(URI.create(destination))
+ .method("PATCH", BodyPublishers.ofString(payload, StandardCharsets.UTF_8))
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .build();
}
}