4b1470a497697e54335e67326a8c2a2ea5fc09b1
[netconf.git] / netconf / tools / netconf-testtool / src / main / java / org / opendaylight / netconf / test / tool / Execution.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.test.tool;
9
10 import com.ning.http.client.AsyncCompletionHandler;
11 import com.ning.http.client.AsyncHttpClient;
12 import com.ning.http.client.AsyncHttpClientConfig;
13 import com.ning.http.client.HttpResponseStatus;
14 import com.ning.http.client.Realm;
15 import com.ning.http.client.Request;
16 import com.ning.http.client.Response;
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Semaphore;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 public class Execution implements Callable<Void> {
26
27     private final ArrayList<Request> payloads;
28     private final AsyncHttpClient asyncHttpClient;
29     private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
30     private final boolean invokeAsync;
31     private final Semaphore semaphore;
32     private final int throttle;
33
34     static final class DestToPayload {
35
36         private final String destination;
37         private final String payload;
38
39         DestToPayload(final String destination, final String payload) {
40             this.destination = destination;
41             this.payload = payload;
42         }
43
44         public String getDestination() {
45             return destination;
46         }
47
48         public String getPayload() {
49             return payload;
50         }
51     }
52
53     public Execution(final TesttoolParameters params, final ArrayList<DestToPayload> payloads) {
54         this.invokeAsync = params.async;
55         this.throttle = params.throttle / params.threadAmount;
56
57         if (params.async && params.threadAmount > 1) {
58             LOG.info("Throttling per thread: {}", this.throttle);
59         }
60         this.semaphore = new Semaphore(this.throttle);
61
62         this.asyncHttpClient = new AsyncHttpClient(new AsyncHttpClientConfig.Builder()
63                 .setConnectTimeout(Integer.MAX_VALUE)
64                 .setRequestTimeout(Integer.MAX_VALUE)
65                 .setAllowPoolingConnections(true)
66                 .build());
67
68         this.payloads = new ArrayList<>();
69         for (DestToPayload payload : payloads) {
70             AsyncHttpClient.BoundRequestBuilder requestBuilder = asyncHttpClient.preparePost(payload.getDestination())
71                     .addHeader("Content-Type", "application/json")
72                     .addHeader("Accept", "application/json")
73                     .setBody(payload.getPayload())
74                     .setRequestTimeout(Integer.MAX_VALUE);
75
76             requestBuilder.setRealm(new Realm.RealmBuilder()
77                     .setScheme(Realm.AuthScheme.BASIC)
78                     .setPrincipal(params.controllerAuthUsername)
79                     .setPassword(params.controllerAuthPassword)
80                     .setMethodName("POST")
81                     .setUsePreemptiveAuth(true)
82                     .build());
83
84             this.payloads.add(requestBuilder.build());
85         }
86     }
87
88     private void invokeSync() {
89         LOG.info("Begin sending sync requests");
90         for (Request request : payloads) {
91             try {
92                 Response response = asyncHttpClient.executeRequest(request).get();
93                 if (response.getStatusCode() != 200 && response.getStatusCode() != 204) {
94                     if (response.getStatusCode() == 409) {
95                         LOG.warn("Request failed, status code: {} - one or more of the devices"
96                                 + " is already configured, skipping the whole batch", response.getStatusCode());
97                     } else {
98                         LOG.warn("Status code: {}", response.getStatusCode());
99                         LOG.warn("url: {}", request.getUrl());
100                         LOG.warn("body: {}", response.getResponseBody());
101                     }
102                 }
103             } catch (InterruptedException | ExecutionException | IOException e) {
104                 LOG.warn("Failed to execute request", e);
105             }
106         }
107         LOG.info("End sending sync requests");
108     }
109
110     private void invokeAsync() {
111         LOG.info("Begin sending async requests");
112
113         for (final Request request : payloads) {
114             try {
115                 semaphore.acquire();
116             } catch (InterruptedException e) {
117                 LOG.warn("Semaphore acquire interrupted");
118             }
119             asyncHttpClient.executeRequest(request, new AsyncCompletionHandler<Response>() {
120                 @Override
121                 public STATE onStatusReceived(final HttpResponseStatus status) throws Exception {
122                     super.onStatusReceived(status);
123                     if (status.getStatusCode() != 200 && status.getStatusCode() != 204) {
124                         if (status.getStatusCode() == 409) {
125                             LOG.warn("Request failed, status code: {} - one or more of the devices"
126                                     + " is already configured, skipping the whole batch", status.getStatusCode());
127                         } else {
128                             LOG.warn("Request failed, status code: {}",
129                                 status.getStatusCode() + status.getStatusText());
130                             LOG.warn("request: {}", request.toString());
131                         }
132                     }
133                     return STATE.CONTINUE;
134                 }
135
136                 @Override
137                 public Response onCompleted(final Response response) {
138                     semaphore.release();
139                     return response;
140                 }
141             });
142         }
143         LOG.info("Requests sent, waiting for responses");
144
145         try {
146             semaphore.acquire(this.throttle);
147         } catch (InterruptedException e) {
148             LOG.warn("Semaphore acquire interrupted");
149         }
150
151         LOG.info("Responses received, ending...");
152     }
153
154     @Override
155     public Void call() {
156         if (invokeAsync) {
157             this.invokeAsync();
158         } else {
159             this.invokeSync();
160         }
161         return null;
162     }
163 }