* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
-
package org.opendaylight.netconf.test.tool;
-import com.ning.http.client.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.google.common.collect.Lists;
import java.io.IOException;
-import java.util.ArrayList;
+import java.net.Authenticator;
+import java.net.PasswordAuthentication;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpRequest.BodyPublishers;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandlers;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class Execution implements Callable<Void> {
-
- private final ArrayList<Request> payloads;
- private final AsyncHttpClient asyncHttpClient;
+final class Execution implements Callable<Void> {
private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
- private final boolean invokeAsync;
- private final Semaphore semaphore;
- private final int throttle;
+ private static final String NETCONF_TOPOLOGY_DESTINATION =
+ "http://%s:%s/rests/data/network-topology:network-topology/topology=topology-netconf";
- static final class DestToPayload {
-
- private final String destination;
- private final String payload;
-
- public DestToPayload(String destination, String payload) {
- this.destination = destination;
- this.payload = payload;
- }
-
- public String getDestination() {
- return destination;
- }
+ private final HttpClient httpClient;
+ private final String destination;
+ private final List<Integer> openDevices;
+ private final TesttoolParameters params;
+ private final Semaphore semaphore;
- public String getPayload() {
- return payload;
- }
- }
+ private final int throttle;
+ private final boolean isAsync;
+
+ Execution(final List<Integer> openDevices, final TesttoolParameters params) {
+ httpClient = HttpClient.newBuilder()
+ .authenticator(new Authenticator() {
+ @Override
+ protected PasswordAuthentication getPasswordAuthentication() {
+ return new PasswordAuthentication(params.controllerAuthUsername,
+ params.controllerAuthPassword.toCharArray());
+ }
+ })
+ .build();
+ destination = String.format(Locale.ROOT, NETCONF_TOPOLOGY_DESTINATION,
+ params.controllerIp, params.controllerPort);
+ this.openDevices = openDevices;
+ this.params = params;
- public Execution(TesttoolParameters params, ArrayList<DestToPayload> payloads) {
- this.invokeAsync = params.async;
- this.throttle = params.throttle / params.threadAmount;
+ throttle = params.throttle / params.threadAmount;
+ isAsync = params.async;
if (params.async && params.threadAmount > 1) {
- LOG.info("Throttling per thread: {}", this.throttle);
+ LOG.info("Throttling per thread: {}", throttle);
}
- this.semaphore = new Semaphore(this.throttle);
-
- this.asyncHttpClient = new AsyncHttpClient(new AsyncHttpClientConfig.Builder()
- .setConnectTimeout(Integer.MAX_VALUE)
- .setRequestTimeout(Integer.MAX_VALUE)
- .setAllowPoolingConnections(true)
- .build());
-
- this.payloads = new ArrayList<>();
- for (DestToPayload payload : payloads) {
- AsyncHttpClient.BoundRequestBuilder requestBuilder = asyncHttpClient.preparePost(payload.getDestination())
- .addHeader("Content-Type", "application/json")
- .addHeader("Accept", "application/json")
- .setBody(payload.getPayload())
- .setRequestTimeout(Integer.MAX_VALUE);
+ semaphore = new Semaphore(throttle);
+ }
- if (params.auth != null) {
- requestBuilder.setRealm(new Realm.RealmBuilder()
- .setScheme(Realm.AuthScheme.BASIC)
- .setPrincipal(params.auth.get(0))
- .setPassword(params.auth.get(1))
- .setMethodName("POST")
- .setUsePreemptiveAuth(true)
- .build());
- }
- this.payloads.add(requestBuilder.build());
+ @Override
+ public Void call() {
+ final List<HttpRequest> requests = prepareRequests();
+ if (isAsync) {
+ sendAsync(requests);
+ } else {
+ sendSync(requests);
}
+ return null;
}
- private void invokeSync() {
- LOG.info("Begin sending sync requests");
- for (Request request : payloads) {
- try {
- Response response = asyncHttpClient.executeRequest(request).get();
- if (response.getStatusCode() != 200 && response.getStatusCode() != 204) {
- if (response.getStatusCode() == 409) {
- LOG.warn("Request failed, status code: {} - one or more of the devices" +
- " is already configured, skipping the whole batch", response.getStatusCode());
- } else {
- LOG.warn("Status code: {}", response.getStatusCode());
- LOG.warn("url: {}", request.getUrl());
- LOG.warn(response.getResponseBody());
- }
- }
- } catch (InterruptedException | ExecutionException | IOException e) {
- LOG.warn(e.toString());
- }
- }
- LOG.info("End sending sync requests");
+ private List<HttpRequest> prepareRequests() {
+ final List<List<Integer>> batches = Lists.partition(openDevices, params.generateConfigBatchSize);
+ return batches.stream()
+ .map(b -> PayloadCreator.createStringPayload(b, params))
+ .map(this::prepareRequest)
+ .collect(Collectors.toList());
}
- private void invokeAsync() {
- final ArrayList<ListenableFuture<Response>> futures = new ArrayList<>();
+ private void sendAsync(final List<HttpRequest> requests) {
LOG.info("Begin sending async requests");
-
- for (final Request request : payloads) {
+ for (final HttpRequest request : requests) {
try {
semaphore.acquire();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.warn("Semaphore acquire interrupted");
}
- futures.add(asyncHttpClient.executeRequest(request, new AsyncCompletionHandler<Response>() {
- @Override
- public STATE onStatusReceived(HttpResponseStatus status) throws Exception {
- super.onStatusReceived(status);
- if (status.getStatusCode() != 200 && status.getStatusCode() != 204) {
- if (status.getStatusCode() == 409) {
- LOG.warn("Request failed, status code: {} - one or more of the devices" +
- " is already configured, skipping the whole batch", status.getStatusCode());
- } else {
- LOG.warn("Request failed, status code: {}", status.getStatusCode() + status.getStatusText());
- LOG.warn("request: {}", request.toString());
- }
- }
- return STATE.CONTINUE;
- }
-
- @Override
- public Response onCompleted(Response response) throws Exception {
- semaphore.release();
- return response;
+ httpClient.sendAsync(request, BodyHandlers.ofString()).whenComplete((response, error) -> {
+ if (response.statusCode() != 200) {
+ LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
+ response.statusCode(), request.uri(), response.body());
}
- }));
+ semaphore.release();
+ });
}
LOG.info("Requests sent, waiting for responses");
-
try {
- semaphore.acquire(this.throttle);
- } catch (InterruptedException e) {
+ semaphore.acquire(throttle);
+ } catch (final InterruptedException e) {
LOG.warn("Semaphore acquire interrupted");
}
-
LOG.info("Responses received, ending...");
}
- @Override
- public Void call() throws Exception {
- if (invokeAsync) {
- this.invokeAsync();
- } else {
- this.invokeSync();
+ private void sendSync(final List<HttpRequest> requests) {
+ LOG.info("Begin sending sync requests");
+ for (final HttpRequest request : requests) {
+ try {
+ final HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
+ if (response.statusCode() != 200) {
+ LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
+ response.statusCode(), request.uri(), response.body());
+ }
+ } catch (final InterruptedException | IOException e) {
+ LOG.error("Failed to execute request: {}", request, e);
+ throw new IllegalStateException("Failed to execute request", e);
+ }
}
- return null;
+ LOG.info("End sending sync requests");
+ }
+
+ private HttpRequest prepareRequest(final String payload) {
+ LOG.info("Creating request to: {} with payload: {}", destination, payload);
+ return HttpRequest.newBuilder(URI.create(destination))
+ .method("PATCH", BodyPublishers.ofString(payload, StandardCharsets.UTF_8))
+ .header("Content-Type", "application/json")
+ .header("Accept", "application/json")
+ .build();
}
}