50b0478628beb68ff332ba738b28f7b78c20c1eb
[controller.git] / benchmark / ntfbenchmark / src / main / java / ntfbenchmark / impl / NtfbenchmarkProvider.java
1 /*
2  * Copyright (c) 2015 Cisco Systems Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package ntfbenchmark.impl;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
20 import org.opendaylight.mdsal.binding.api.NotificationService;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.NtfbenchmarkService;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
28 import org.opendaylight.yangtools.concepts.ListenerRegistration;
29 import org.opendaylight.yangtools.yang.common.RpcResult;
30 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
31 import org.opendaylight.yangtools.yang.common.Uint32;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class NtfbenchmarkProvider implements AutoCloseable, NtfbenchmarkService {
36     private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
37     private static final int TEST_TIMEOUT = 5;
38
39     private final NotificationService listenService;
40     private final NotificationPublishService publishService;
41
42     public NtfbenchmarkProvider(final NotificationService listenServiceDependency,
43             final NotificationPublishService publishServiceDependency) {
44         LOG.debug("NtfbenchmarkProvider Constructor");
45         listenService = listenServiceDependency;
46         publishService = publishServiceDependency;
47     }
48
49     public void init() {
50         LOG.info("NtfbenchmarkProvider initiated");
51     }
52
53     @Override
54     public void close() {
55         LOG.info("NtfbenchmarkProvider closed");
56     }
57
58     @Override
59     public ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
60         final int producerCount = input.getProducers().intValue();
61         final int listenerCount = input.getListeners().intValue();
62         final int iterations = input.getIterations().intValue();
63         final int payloadSize = input.getIterations().intValue();
64
65         final List<AbstractNtfbenchProducer> producers = new ArrayList<>(producerCount);
66         final List<ListenerRegistration<NtfbenchTestListener>> listeners = new ArrayList<>(listenerCount);
67         for (int i = 0; i < producerCount; i++) {
68             producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
69         }
70         int expectedCntPerListener = producerCount * iterations;
71
72         for (int i = 0; i < listenerCount; i++) {
73             final NtfbenchTestListener listener;
74             if (input.getProducerType() == ProducerType.BLOCKING) {
75                 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
76             } else {
77                 listener = new NtfbenchTestListener(payloadSize);
78             }
79             listeners.add(listenService.registerNotificationListener(listener));
80         }
81
82         try {
83             final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
84
85             LOG.info("Test Started");
86             final long startTime = System.nanoTime();
87
88             for (int i = 0; i < input.getProducers().intValue(); i++) {
89                 // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE for now, but we should check some more
90                 verifyNotNull(executor.submit(producers.get(i)));
91             }
92             executor.shutdown();
93             try {
94                 executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES);
95                 for (ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
96                     listenerRegistration.getInstance().getAllDone().get();
97                 }
98             } catch (final InterruptedException | ExecutionException e) {
99                 LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT);
100             }
101
102             final long producerEndTime = System.nanoTime();
103             final long producerElapsedTime = producerEndTime - startTime;
104
105             long allListeners = 0;
106             long allProducersOk = 0;
107             long allProducersError = 0;
108
109             for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
110                 allListeners += listenerRegistration.getInstance().getReceived();
111             }
112
113             final long listenerElapsedTime = producerEndTime - startTime;
114
115             LOG.info("Test Done");
116
117             for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
118                 allProducersOk += abstractNtfbenchProducer.getNtfOk();
119                 allProducersError += abstractNtfbenchProducer.getNtfError();
120             }
121
122             final StartTestOutput output =
123                     new StartTestOutputBuilder()
124                             .setProducerElapsedTime(Uint32.valueOf(producerElapsedTime / 1000000))
125                             .setListenerElapsedTime(Uint32.valueOf(listenerElapsedTime / 1000000))
126                             .setListenerOk(Uint32.valueOf(allListeners))
127                             .setProducerOk(Uint32.valueOf(allProducersOk))
128                             .setProducerError(Uint32.valueOf(allProducersError))
129                             .setProducerRate(Uint32.valueOf((allProducersOk + allProducersError) * 1000000000
130                                 / producerElapsedTime))
131                             .setListenerRate(Uint32.valueOf(allListeners * 1000000000 / listenerElapsedTime))
132                            .build();
133             return RpcResultBuilder.success(output).buildFuture();
134         } finally {
135             for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
136                 listenerRegistration.close();
137             }
138         }
139     }
140
141     @Override
142     public ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
143         // TODO Auto-generated method stub
144         return null;
145     }
146
147 }