8a0595fde6038f4828ddb313ed6570b5f2c9f3be
[netconf.git] / netconf / tools / netconf-testtool / src / main / java / org / opendaylight / netconf / test / tool / Execution.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.test.tool;
9
10 import com.ning.http.client.AsyncCompletionHandler;
11 import com.ning.http.client.AsyncHttpClient;
12 import com.ning.http.client.AsyncHttpClientConfig;
13 import com.ning.http.client.HttpResponseStatus;
14 import com.ning.http.client.Realm;
15 import com.ning.http.client.Request;
16 import com.ning.http.client.Response;
17 import java.io.IOException;
18 import java.util.ArrayList;
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Semaphore;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 public class Execution implements Callable<Void> {
26
27     private final ArrayList<Request> payloads;
28     private final AsyncHttpClient asyncHttpClient;
29     private static final Logger LOG = LoggerFactory.getLogger(Execution.class);
30     private final boolean invokeAsync;
31     private final Semaphore semaphore;
32     private final int throttle;
33
34     static final class DestToPayload {
35
36         private final String destination;
37         private final String payload;
38
39         DestToPayload(final String destination, final String payload) {
40             this.destination = destination;
41             this.payload = payload;
42         }
43
44         public String getDestination() {
45             return destination;
46         }
47
48         public String getPayload() {
49             return payload;
50         }
51     }
52
53     public Execution(final TesttoolParameters params, final ArrayList<DestToPayload> payloads) {
54         this.invokeAsync = params.async;
55         this.throttle = params.throttle / params.threadAmount;
56
57         if (params.async && params.threadAmount > 1) {
58             LOG.info("Throttling per thread: {}", this.throttle);
59         }
60         this.semaphore = new Semaphore(this.throttle);
61
62         this.asyncHttpClient = new AsyncHttpClient(new AsyncHttpClientConfig.Builder()
63                 .setConnectTimeout(Integer.MAX_VALUE)
64                 .setRequestTimeout(Integer.MAX_VALUE)
65                 .setAllowPoolingConnections(true)
66                 .build());
67
68         this.payloads = new ArrayList<>();
69         for (DestToPayload payload : payloads) {
70             AsyncHttpClient.BoundRequestBuilder requestBuilder = asyncHttpClient.preparePost(payload.getDestination())
71                     .addHeader("Content-Type", "application/json")
72                     .addHeader("Accept", "application/json")
73                     .setBody(payload.getPayload())
74                     .setRequestTimeout(Integer.MAX_VALUE);
75
76             if (params.auth != null) {
77                 requestBuilder.setRealm(new Realm.RealmBuilder()
78                         .setScheme(Realm.AuthScheme.BASIC)
79                         .setPrincipal(params.auth.get(0))
80                         .setPassword(params.auth.get(1))
81                         .setMethodName("POST")
82                         .setUsePreemptiveAuth(true)
83                         .build());
84             }
85             this.payloads.add(requestBuilder.build());
86         }
87     }
88
89     private void invokeSync() {
90         LOG.info("Begin sending sync requests");
91         for (Request request : payloads) {
92             try {
93                 Response response = asyncHttpClient.executeRequest(request).get();
94                 if (response.getStatusCode() != 200 && response.getStatusCode() != 204) {
95                     if (response.getStatusCode() == 409) {
96                         LOG.warn("Request failed, status code: {} - one or more of the devices"
97                                 + " is already configured, skipping the whole batch", response.getStatusCode());
98                     } else {
99                         LOG.warn("Status code: {}", response.getStatusCode());
100                         LOG.warn("url: {}", request.getUrl());
101                         LOG.warn("body: {}", response.getResponseBody());
102                     }
103                 }
104             } catch (InterruptedException | ExecutionException | IOException e) {
105                 LOG.warn("Failed to execute request", e);
106             }
107         }
108         LOG.info("End sending sync requests");
109     }
110
111     private void invokeAsync() {
112         LOG.info("Begin sending async requests");
113
114         for (final Request request : payloads) {
115             try {
116                 semaphore.acquire();
117             } catch (InterruptedException e) {
118                 LOG.warn("Semaphore acquire interrupted");
119             }
120             asyncHttpClient.executeRequest(request, new AsyncCompletionHandler<Response>() {
121                 @Override
122                 public STATE onStatusReceived(final HttpResponseStatus status) throws Exception {
123                     super.onStatusReceived(status);
124                     if (status.getStatusCode() != 200 && status.getStatusCode() != 204) {
125                         if (status.getStatusCode() == 409) {
126                             LOG.warn("Request failed, status code: {} - one or more of the devices"
127                                     + " is already configured, skipping the whole batch", status.getStatusCode());
128                         } else {
129                             LOG.warn("Request failed, status code: {}",
130                                 status.getStatusCode() + status.getStatusText());
131                             LOG.warn("request: {}", request.toString());
132                         }
133                     }
134                     return STATE.CONTINUE;
135                 }
136
137                 @Override
138                 public Response onCompleted(final Response response) {
139                     semaphore.release();
140                     return response;
141                 }
142             });
143         }
144         LOG.info("Requests sent, waiting for responses");
145
146         try {
147             semaphore.acquire(this.throttle);
148         } catch (InterruptedException e) {
149             LOG.warn("Semaphore acquire interrupted");
150         }
151
152         LOG.info("Responses received, ending...");
153     }
154
155     @Override
156     public Void call() {
157         if (invokeAsync) {
158             this.invokeAsync();
159         } else {
160             this.invokeSync();
161         }
162         return null;
163     }
164 }