180dad7d120fd42bbc8bc8777f1305a4530ae467
[netconf.git] / netconf / tools / netconf-testtool / src / main / java / org / opendaylight / netconf / test / tool / client / http / perf / AsyncExecutionStrategy.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.test.tool.client.http.perf;
10
11 import com.ning.http.client.AsyncCompletionHandler;
12 import com.ning.http.client.AsyncHttpClient;
13 import com.ning.http.client.HttpResponseStatus;
14 import com.ning.http.client.ListenableFuture;
15 import com.ning.http.client.Request;
16 import com.ning.http.client.Response;
17 import java.util.ArrayList;
18 import java.util.concurrent.Semaphore;
19 import org.opendaylight.netconf.test.tool.client.stress.ExecutionStrategy;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 public class AsyncExecutionStrategy implements ExecutionStrategy {
24
25     private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionStrategy.class);
26
27     private final Parameters params;
28     private final ArrayList<Request> payloads;
29     private final AsyncHttpClient asyncHttpClient;
30     private final Semaphore semaphore;
31
32     AsyncExecutionStrategy(final Parameters params, final AsyncHttpClient asyncHttpClient,
33                            final ArrayList<Request> payloads) {
34         this.params = params;
35         this.asyncHttpClient = asyncHttpClient;
36         this.payloads = payloads;
37         this.semaphore = new Semaphore(RestPerfClient.throttle);
38     }
39
40     @Override
41     public void invoke() {
42         final ArrayList<ListenableFuture<Response>> futures = new ArrayList<>();
43         LOG.info("Begin sending async requests");
44
45         for (final Request request : payloads) {
46             try {
47                 semaphore.acquire();
48             } catch (InterruptedException e) {
49                 LOG.warn("Semaphore acquire interrupted");
50             }
51             futures.add(asyncHttpClient.executeRequest(request, new AsyncCompletionHandler<Response>() {
52                 @Override
53                 public STATE onStatusReceived(HttpResponseStatus status) throws Exception {
54                     super.onStatusReceived(status);
55                     if (status.getStatusCode() != 200 && status.getStatusCode() != 204) {
56                         LOG.warn("Request failed, status code: {}", status.getStatusCode() + status.getStatusText());
57                         LOG.warn("request: {}", request.toString());
58                     }
59                     return STATE.CONTINUE;
60                 }
61
62                 @Override
63                 public Response onCompleted(Response response) throws Exception {
64                     semaphore.release();
65                     return response;
66                 }
67             }));
68         }
69         LOG.info("Requests sent, waiting for responses");
70
71         try {
72             semaphore.acquire(RestPerfClient.throttle);
73         } catch (InterruptedException e) {
74             LOG.warn("Semaphore acquire interrupted");
75         }
76
77         LOG.info("Responses received, ending...");
78     }
79 }