75aefea37f79d34a4e2371c50812233f3e223d78
[netconf.git] / netconf / tools / netconf-testtool / src / main / java / org / opendaylight / netconf / test / tool / Execution.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.test.tool;
9
10 import com.google.common.collect.Lists;
11 import java.io.IOException;
12 import java.net.Authenticator;
13 import java.net.PasswordAuthentication;
14 import java.net.URI;
15 import java.net.http.HttpClient;
16 import java.net.http.HttpRequest;
17 import java.net.http.HttpRequest.BodyPublishers;
18 import java.net.http.HttpResponse;
19 import java.net.http.HttpResponse.BodyHandlers;
20 import java.nio.charset.StandardCharsets;
21 import java.util.List;
22 import java.util.Locale;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.Semaphore;
25 import java.util.stream.Collectors;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 final class Execution implements Callable<Void> {
30     private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
31     private static final String NETCONF_TOPOLOGY_DESTINATION =
32             "http://%s:%s/rests/data/network-topology:network-topology/topology=topology-netconf";
33
34     private final HttpClient httpClient;
35     private final String destination;
36     private final List<Integer> openDevices;
37     private final TesttoolParameters params;
38     private final Semaphore semaphore;
39
40     private final int throttle;
41     private final boolean isAsync;
42
43     Execution(final List<Integer> openDevices, final TesttoolParameters params) {
44         httpClient = HttpClient.newBuilder()
45                 .authenticator(new Authenticator() {
46                     @Override
47                     protected PasswordAuthentication getPasswordAuthentication() {
48                         return new PasswordAuthentication(params.controllerAuthUsername,
49                                 params.controllerAuthPassword.toCharArray());
50                     }
51                 })
52                 .build();
53         destination = String.format(Locale.ROOT, NETCONF_TOPOLOGY_DESTINATION,
54                 params.controllerIp, params.controllerPort);
55         this.openDevices = openDevices;
56         this.params = params;
57
58         throttle = params.throttle / params.threadAmount;
59         isAsync = params.async;
60
61         if (params.async && params.threadAmount > 1) {
62             LOG.info("Throttling per thread: {}", throttle);
63         }
64         semaphore = new Semaphore(throttle);
65     }
66
67     @Override
68     public Void call() {
69         final List<HttpRequest> requests = prepareRequests();
70         if (isAsync) {
71             this.sendAsync(requests);
72         } else {
73             this.sendSync(requests);
74         }
75         return null;
76     }
77
78     private List<HttpRequest> prepareRequests() {
79         final List<List<Integer>> batches = Lists.partition(openDevices, params.generateConfigBatchSize);
80         return batches.stream()
81                 .map(b -> PayloadCreator.createStringPayload(b, params))
82                 .map(this::prepareRequest)
83                 .collect(Collectors.toList());
84     }
85
86     private void sendAsync(final List<HttpRequest> requests) {
87         LOG.info("Begin sending async requests");
88         for (final HttpRequest request : requests) {
89             try {
90                 semaphore.acquire();
91             } catch (final InterruptedException e) {
92                 LOG.warn("Semaphore acquire interrupted");
93             }
94             httpClient.sendAsync(request, BodyHandlers.ofString()).whenComplete((response, error) -> {
95                 if (response.statusCode() != 200) {
96                     LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
97                             response.statusCode(), request.uri(), response.body());
98                 }
99                 semaphore.release();
100             });
101         }
102         LOG.info("Requests sent, waiting for responses");
103         try {
104             semaphore.acquire(this.throttle);
105         } catch (final InterruptedException e) {
106             LOG.warn("Semaphore acquire interrupted");
107         }
108         LOG.info("Responses received, ending...");
109     }
110
111     private void sendSync(final List<HttpRequest> requests) {
112         LOG.info("Begin sending sync requests");
113         for (final HttpRequest request : requests) {
114             try {
115                 final HttpResponse<String> response = httpClient.send(request, BodyHandlers.ofString());
116                 if (response.statusCode() != 200) {
117                     LOG.warn("Unexpected status code: {} for request to uri: {} with body: {}",
118                             response.statusCode(), request.uri(), response.body());
119                 }
120             } catch (final InterruptedException | IOException e) {
121                 LOG.error("Failed to execute request: {}", request, e);
122                 throw new RuntimeException("Failed to execute request", e);
123             }
124         }
125         LOG.info("End sending sync requests");
126     }
127
128     private HttpRequest prepareRequest(final String payload) {
129         LOG.info("Creating request to: {} with payload: {}", destination, payload);
130         return HttpRequest.newBuilder(URI.create(destination))
131                 .method("PATCH", BodyPublishers.ofString(payload, StandardCharsets.UTF_8))
132                 .header("Content-Type", "application/json")
133                 .header("Accept", "application/json")
134                 .build();
135     }
136 }