CDS: Add stress test RPC to the cars model
[controller.git] / opendaylight / netconf / netconf-testtool / src / main / java / org / opendaylight / controller / netconf / test / tool / client / http / perf / AsyncExecutionStrategy.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.test.tool.client.http.perf;
10
11 import com.ning.http.client.AsyncCompletionHandler;
12 import com.ning.http.client.AsyncHttpClient;
13 import com.ning.http.client.HttpResponseStatus;
14 import com.ning.http.client.ListenableFuture;
15 import com.ning.http.client.Request;
16 import com.ning.http.client.Response;
17 import java.util.ArrayList;
18 import java.util.concurrent.Semaphore;
19 import org.opendaylight.controller.netconf.test.tool.client.stress.ExecutionStrategy;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 public class AsyncExecutionStrategy implements ExecutionStrategy{
24
25     private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionStrategy.class);
26
27     private final Parameters params;
28     private final ArrayList<Request> payloads;
29     private final AsyncHttpClient asyncHttpClient;
30     private final Semaphore semaphore;
31
32     AsyncExecutionStrategy(final Parameters params, final AsyncHttpClient asyncHttpClient, final ArrayList<Request> payloads) {
33         this.params = params;
34         this.asyncHttpClient = asyncHttpClient;
35         this.payloads = payloads;
36         this.semaphore = new Semaphore(RestPerfClient.throttle);
37     }
38
39     @Override
40     public void invoke() {
41         final ArrayList<ListenableFuture<Response>> futures = new ArrayList<>();
42         LOG.info("Begin sending async requests");
43
44         for (final Request request : payloads) {
45             try {
46                 semaphore.acquire();
47             } catch (InterruptedException e) {
48                 LOG.warn("Semaphore acquire interrupted");
49             }
50             futures.add(asyncHttpClient.executeRequest(request, new AsyncCompletionHandler<Response>() {
51                 @Override
52                 public STATE onStatusReceived(HttpResponseStatus status) throws Exception {
53                     super.onStatusReceived(status);
54                     if (status.getStatusCode() != 200 && status.getStatusCode() != 204) {
55                         LOG.warn("Request failed, status code: {}", status.getStatusCode() + status.getStatusText());
56                         LOG.warn("request: {}", request.toString());
57                     }
58                     return STATE.CONTINUE;
59                 }
60
61                 @Override
62                 public Response onCompleted(Response response) throws Exception {
63                     semaphore.release();
64                     return response;
65                 }
66             }));
67         }
68         LOG.info("Requests sent, waiting for responses");
69
70         try {
71             semaphore.acquire(RestPerfClient.throttle);
72         } catch (InterruptedException e) {
73             LOG.warn("Semaphore acquire interrupted");
74         }
75
76         LOG.info("Responses received, ending...");
77     }
78 }