Bump upstream versions
[netconf.git] / netconf / tools / netconf-testtool / src / main / java / org / opendaylight / netconf / test / tool / Execution.java
index e2799d86614888bb2668117e46026d8b5d2c717b..a13ad5290623860bb53497c753366b4e7b0907d4 100644 (file)
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
-
 package org.opendaylight.netconf.test.tool;
 
-import com.ning.http.client.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.net.Authenticator;
+import java.net.PasswordAuthentication;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpRequest.BodyPublishers;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandlers;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class Execution implements Callable<Void> {
-
-    private final ArrayList<Request> payloads;
-    private final AsyncHttpClient asyncHttpClient;
+final class Execution implements Callable<Void> {
     private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
-    private final boolean invokeAsync;
-    private final Semaphore semaphore;
-    private final int throttle;
+    private static final String NETCONF_TOPOLOGY_DESTINATION =
+            "http://%s:%s/rests/data/network-topology:network-topology/topology=topology-netconf";
 
-    static final class DestToPayload {
-
-        private final String destination;
-        private final String payload;
-
-        public DestToPayload(String destination, String payload) {
-            this.destination = destination;
-            this.payload = payload;
-        }
-
-        public String getDestination() {
-            return destination;
-        }
+    private final HttpClient httpClient;
+    private final String destination;
+    private final List<Integer> openDevices;
+    private final TesttoolParameters params;
+    private final Semaphore semaphore;
 
-        public String getPayload() {
-            return payload;
-        }
-    }
+    private final int throttle;
+    private final boolean isAsync;
+
+    Execution(final List<Integer> openDevices, final TesttoolParameters params) {
+        httpClient = HttpClient.newBuilder()
+                .authenticator(new Authenticator() {
+                    @Override
+                    protected PasswordAuthentication getPasswordAuthentication() {
+                        return new PasswordAuthentication(params.controllerAuthUsername,
+                                params.controllerAuthPassword.toCharArray());
+                    }
+                })
+                .build();
+        destination = String.format(Locale.ROOT, NETCONF_TOPOLOGY_DESTINATION,
+                params.controllerIp, params.controllerPort);
+        this.openDevices = openDevices;
+        this.params = params;
 
-    public Execution(TesttoolParameters params, ArrayList<DestToPayload> payloads) {
-        this.invokeAsync = params.async;
-        this.throttle = params.throttle / params.threadAmount;
+        throttle = params.throttle / params.threadAmount;
+        isAsync = params.async;
 
         if (params.async && params.threadAmount > 1) {
-            LOG.info("Throttling per thread: {}", this.throttle);
+            LOG.info("Throttling per thread: {}", throttle);
         }
-        this.semaphore = new Semaphore(this.throttle);
-
-        this.asyncHttpClient = new AsyncHttpClient(new AsyncHttpClientConfig.Builder()
-                .setConnectTimeout(Integer.MAX_VALUE)
-                .setRequestTimeout(Integer.MAX_VALUE)
-                .setAllowPoolingConnections(true)
-                .build());
-
-        this.payloads = new ArrayList<>();
-        for (DestToPayload payload : payloads) {
-            AsyncHttpClient.BoundRequestBuilder requestBuilder = asyncHttpClient.preparePost(payload.getDestination())
-                    .addHeader("Content-Type", "application/json")
-                    .addHeader("Accept", "application/json")
-                    .setBody(payload.getPayload())
-                    .setRequestTimeout(Integer.MAX_VALUE);
+        semaphore = new Semaphore(throttle);
+    }
 
-            if (params.auth != null) {
-                requestBuilder.setRealm(new Realm.RealmBuilder()
-                        .setScheme(Realm.AuthScheme.BASIC)
-                        .setPrincipal(params.auth.get(0))
-                        .setPassword(params.auth.get(1))
-                        .setMethodName("POST")
-                        .setUsePreemptiveAuth(true)
-                        .build());
-            }
-            this.payloads.add(requestBuilder.build());
+    @Override
+    public Void call() {
+        final List<HttpRequest> requests = prepareRequests();
+        if (isAsync) {
+            sendAsync(requests);
+        } else {
+            sendSync(requests);
         }
+        return null;
     }
 
-    private void invokeSync() {
-        LOG.info("Begin sending sync requests");
-        for (Request request : payloads) {
-            try {
-                Response response = asyncHttpClient.executeRequest(request).get();
-                if (response.getStatusCode() != 200 && response.getStatusCode() != 204) {
-                    if (response.getStatusCode() == 409) {
-                        LOG.warn("Request failed, status code: {} - one or more of the devices" +
-                                " is already configured, skipping the whole batch", response.getStatusCode());
-                    } else {
-                        LOG.warn("Status code: {}", response.getStatusCode());
-                        LOG.warn("url: {}", request.getUrl());
-                        LOG.warn(response.getResponseBody());
-                    }
-                }
-            } catch (InterruptedException | ExecutionException | IOException e) {
-                LOG.warn(e.toString());
-            }
-        }
-        LOG.info("End sending sync requests");
+    private List<HttpRequest> prepareRequests() {
+        final List<List<Integer>> batches = Lists.partition(openDevices, params.generateConfigBatchSize);
+        return batches.stream()
+                .map(b -> PayloadCreator.createStringPayload(b, params))
+                .map(this::prepareRequest)
+                .collect(Collectors.toList());
     }
 
-    private void invokeAsync() {
-        final ArrayList<ListenableFuture<Response>> futures = new ArrayList<>();
+    private void sendAsync(final List<HttpRequest> requests) {
         LOG.info("Begin sending async requests");
-
-        for (final Request request : payloads) {
+        for (final HttpRequest request : requests) {
             try {
                 semaphore.acquire();
-            } catch (InterruptedException e) {
+            } catch (final InterruptedException e) {
                 LOG.warn("Semaphore acquire interrupted");
             }
-            futures.add(asyncHttpClient.executeRequest(request, new AsyncCompletionHandler<Response>() {
-                @Override
-                public STATE onStatusReceived(HttpResponseStatus status) throws Exception {
-                    super.onStatusReceived(status);
-                    if (status.getStatusCode() != 200 && status.getStatusCode() != 204) {
-                        if (status.getStatusCode() == 409) {
-                            LOG.warn("Request failed, status code: {} - one or more of the devices" +
-                                    " is already configured, skipping the whole batch", status.getStatusCode());
-                        } else {
-                            LOG.warn("Request failed, status code: {}", status.getStatusCode() + status.getStatusText());
-                            LOG.warn("request: {}", request.toString());
-                        }
-                    }
-                    return STATE.CONTINUE;
-                }
-
-                @Override
-                public Response onCompleted(Response response) throws Exception {
-                    semaphore.release();
-                    return response;
+            httpClient.sendAsync(request, BodyHandlers.ofString()).whenComplete((response, error) -> {
+                if (response.statusCode() != 200) {
+                    LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
+                            response.statusCode(), request.uri(), response.body());
                 }
-            }));
+                semaphore.release();
+            });
         }
         LOG.info("Requests sent, waiting for responses");
-
         try {
-            semaphore.acquire(this.throttle);
-        } catch (InterruptedException e) {
+            semaphore.acquire(throttle);
+        } catch (final InterruptedException e) {
             LOG.warn("Semaphore acquire interrupted");
         }
-
         LOG.info("Responses received, ending...");
     }
 
-    @Override
-    public Void call() throws Exception {
-        if (invokeAsync) {
-            this.invokeAsync();
-        } else {
-            this.invokeSync();
+    private void sendSync(final List<HttpRequest> requests) {
+        LOG.info("Begin sending sync requests");
+        for (final HttpRequest request : requests) {
+            try {
+                final HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
+                if (response.statusCode() != 200) {
+                    LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
+                            response.statusCode(), request.uri(), response.body());
+                }
+            } catch (final InterruptedException | IOException e) {
+                LOG.error("Failed to execute request: {}", request, e);
+                throw new IllegalStateException("Failed to execute request", e);
+            }
         }
-        return null;
+        LOG.info("End sending sync requests");
+    }
+
+    private HttpRequest prepareRequest(final String payload) {
+        LOG.info("Creating request to: {} with payload: {}", destination, payload);
+        return HttpRequest.newBuilder(URI.create(destination))
+                .method("PATCH", BodyPublishers.ofString(payload, StandardCharsets.UTF_8))
+                .header("Content-Type", "application/json")
+                .header("Accept", "application/json")
+                .build();
     }
 }