Fix testtool device registration
[netconf.git] / netconf / tools / netconf-testtool / src / main / java / org / opendaylight / netconf / test / tool / Execution.java
index 208ee76c84eeff086fc2f17469ae724e1e2b287b..05957204b73fc78ca07707fd85ddd0458dbe66b5 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.netconf.test.tool;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.Authenticator;
 import java.net.PasswordAuthentication;
@@ -17,50 +18,30 @@ import java.net.http.HttpRequest.BodyPublishers;
 import java.net.http.HttpResponse;
 import java.net.http.HttpResponse.BodyHandlers;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Semaphore;
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Execution implements Callable<Void> {
+final class Execution implements Callable<Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
+    private static final String NETCONF_TOPOLOGY_DESTINATION =
+            "http://%s:%s/restconf/config/network-topology:network-topology/topology/topology-netconf";
 
-    private final ArrayList<HttpRequest> payloads;
     private final HttpClient httpClient;
-    private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
-    private final boolean invokeAsync;
+    private final String destination;
+    private final List<Integer> openDevices;
+    private final TesttoolParameters params;
     private final Semaphore semaphore;
-    private final int throttle;
-
-    static final class DestToPayload {
-
-        private final String destination;
-        private final String payload;
-
-        DestToPayload(final String destination, final String payload) {
-            this.destination = destination;
-            this.payload = payload;
-        }
-
-        public String getDestination() {
-            return destination;
-        }
-
-        public String getPayload() {
-            return payload;
-        }
-    }
 
-    public Execution(final TesttoolParameters params, final ArrayList<DestToPayload> payloads) {
-        this.invokeAsync = params.async;
-        this.throttle = params.throttle / params.threadAmount;
-
-        if (params.async && params.threadAmount > 1) {
-            LOG.info("Throttling per thread: {}", this.throttle);
-        }
-        this.semaphore = new Semaphore(this.throttle);
+    private final int throttle;
+    private final boolean isAsync;
 
-        this.httpClient = HttpClient.newBuilder()
+    Execution(final List<Integer> openDevices, final TesttoolParameters params) {
+        httpClient = HttpClient.newBuilder()
                 .authenticator(new Authenticator() {
                     @Override
                     protected PasswordAuthentication getPasswordAuthentication() {
@@ -69,81 +50,87 @@ public class Execution implements Callable<Void> {
                     }
                 })
                 .build();
-        this.payloads = new ArrayList<>();
-        for (DestToPayload payload : payloads) {
-            HttpRequest request = HttpRequest.newBuilder(URI.create(payload.getDestination()))
-                    .POST(BodyPublishers.ofString(payload.getPayload(), StandardCharsets.UTF_8))
-                    .header("Content-Type", "application/json")
-                    .header("Accept", "application/json")
-                    .build();
+        destination = String.format(Locale.ROOT, NETCONF_TOPOLOGY_DESTINATION,
+                params.controllerIp, params.controllerPort);
+        this.openDevices = openDevices;
+        this.params = params;
+
+        throttle = params.throttle / params.threadAmount;
+        isAsync = params.async;
 
-            this.payloads.add(request);
+        if (params.async && params.threadAmount > 1) {
+            LOG.info("Throttling per thread: {}", throttle);
         }
+        semaphore = new Semaphore(throttle);
     }
 
-    private void invokeSync() {
-        LOG.info("Begin sending sync requests");
-        for (HttpRequest request : payloads) {
-            try {
-                HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
-
-                if (response.statusCode() != 200 && response.statusCode() != 204) {
-                    if (response.statusCode() == 409) {
-                        LOG.warn("Request failed, status code: {} - one or more of the devices"
-                                + " is already configured, skipping the whole batch", response.statusCode());
-                    } else {
-                        LOG.warn("Status code: {}", response.statusCode());
-                        LOG.warn("url: {}", request.uri());
-                        LOG.warn("body: {}", response.body());
-                    }
-                }
-            } catch (InterruptedException | IOException e) {
-                LOG.warn("Failed to execute request", e);
-            }
+    @Override
+    public Void call() {
+        final List<HttpRequest> requests = prepareRequests();
+        if (isAsync) {
+            this.sendAsync(requests);
+        } else {
+            this.sendSync(requests);
         }
-        LOG.info("End sending sync requests");
+        return null;
     }
 
-    private void invokeAsync() {
-        LOG.info("Begin sending async requests");
+    private List<HttpRequest> prepareRequests() {
+        final List<List<Integer>> batches = Lists.partition(openDevices, params.generateConfigBatchSize);
+        return batches.stream()
+                .map(b -> PayloadCreator.createStringPayload(b, params))
+                .map(this::prepareRequest)
+                .collect(Collectors.toList());
+    }
 
-        for (final HttpRequest request : payloads) {
+    private void sendAsync(final List<HttpRequest> requests) {
+        LOG.info("Begin sending async requests");
+        for (final HttpRequest request : requests) {
             try {
                 semaphore.acquire();
-            } catch (InterruptedException e) {
+            } catch (final InterruptedException e) {
                 LOG.warn("Semaphore acquire interrupted");
             }
             httpClient.sendAsync(request, BodyHandlers.ofString()).whenComplete((response, error) -> {
-                if (response.statusCode() != 200 && response.statusCode() != 204) {
-                    if (response.statusCode() == 409) {
-                        LOG.warn("Request failed, status code: {} - one or more of the devices"
-                                + " is already configured, skipping the whole batch", response.statusCode());
-                    } else {
-                        LOG.warn("Request failed, status code: {}", response.statusCode());
-                        LOG.warn("request: {}", request);
-                    }
+                if (response.statusCode() != 200) {
+                    LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
+                            response.statusCode(), request.uri(), response.body());
                 }
                 semaphore.release();
             });
         }
         LOG.info("Requests sent, waiting for responses");
-
         try {
             semaphore.acquire(this.throttle);
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
             LOG.warn("Semaphore acquire interrupted");
         }
-
         LOG.info("Responses received, ending...");
     }
 
-    @Override
-    public Void call() {
-        if (invokeAsync) {
-            this.invokeAsync();
-        } else {
-            this.invokeSync();
+    private void sendSync(final List<HttpRequest> requests) {
+        LOG.info("Begin sending sync requests");
+        for (final HttpRequest request : requests) {
+            try {
+                final HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
+                if (response.statusCode() != 200) {
+                    LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
+                            response.statusCode(), request.uri(), response.body());
+                }
+            } catch (final InterruptedException | IOException e) {
+                LOG.error("Failed to execute request: {}", request, e);
+                throw new RuntimeException("Failed to execute request", e);
+            }
         }
-        return null;
+        LOG.info("End sending sync requests");
+    }
+
+    private HttpRequest prepareRequest(final String payload) {
+        LOG.info("Creating request to: {} with payload: {}", destination, payload);
+        return HttpRequest.newBuilder(URI.create(destination))
+                .method("PATCH", BodyPublishers.ofString(payload, StandardCharsets.UTF_8))
+                .header("Content-Type", "application/json")
+                .header("Accept", "application/json")
+                .build();
     }
 }