2 * Copyright (c) 2015 Cisco Systems Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package ntfbenchmark.impl;
10 import static com.google.common.base.Verify.verifyNotNull;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
20 import org.opendaylight.mdsal.binding.api.NotificationService;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.NtfbenchmarkService;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
28 import org.opendaylight.yangtools.concepts.ListenerRegistration;
29 import org.opendaylight.yangtools.yang.common.RpcResult;
30 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 public class NtfbenchmarkProvider implements AutoCloseable, NtfbenchmarkService {
35 private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
36 private static final int TEST_TIMEOUT = 5;
38 private final NotificationService listenService;
39 private final NotificationPublishService publishService;
41 public NtfbenchmarkProvider(final NotificationService listenServiceDependency,
42 final NotificationPublishService publishServiceDependency) {
43 LOG.debug("NtfbenchmarkProvider Constructor");
44 listenService = listenServiceDependency;
45 publishService = publishServiceDependency;
49 LOG.info("NtfbenchmarkProvider initiated");
54 LOG.info("NtfbenchmarkProvider closed");
58 public ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
59 final int producerCount = input.getProducers().intValue();
60 final int listenerCount = input.getListeners().intValue();
61 final int iterations = input.getIterations().intValue();
62 final int payloadSize = input.getIterations().intValue();
64 final List<AbstractNtfbenchProducer> producers = new ArrayList<>(producerCount);
65 final List<ListenerRegistration<NtfbenchTestListener>> listeners = new ArrayList<>(listenerCount);
66 for (int i = 0; i < producerCount; i++) {
67 producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
69 int expectedCntPerListener = producerCount * iterations;
71 for (int i = 0; i < listenerCount; i++) {
72 final NtfbenchTestListener listener;
73 if (input.getProducerType() == ProducerType.BLOCKING) {
74 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
76 listener = new NtfbenchTestListener(payloadSize);
78 listeners.add(listenService.registerNotificationListener(listener));
82 final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
84 LOG.info("Test Started");
85 final long startTime = System.nanoTime();
87 for (int i = 0; i < input.getProducers().intValue(); i++) {
88 // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE for now, but we should check some more
89 verifyNotNull(executor.submit(producers.get(i)));
93 executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES);
94 for (ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
95 listenerRegistration.getInstance().getAllDone().get();
97 } catch (final InterruptedException | ExecutionException e) {
98 LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT);
101 final long producerEndTime = System.nanoTime();
102 final long producerElapsedTime = producerEndTime - startTime;
104 long allListeners = 0;
105 long allProducersOk = 0;
106 long allProducersError = 0;
108 for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
109 allListeners += listenerRegistration.getInstance().getReceived();
112 final long listenerElapsedTime = producerEndTime - startTime;
114 LOG.info("Test Done");
116 for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
117 allProducersOk += abstractNtfbenchProducer.getNtfOk();
118 allProducersError += abstractNtfbenchProducer.getNtfError();
121 final StartTestOutput output =
122 new StartTestOutputBuilder()
123 .setProducerElapsedTime(producerElapsedTime / 1000000)
124 .setListenerElapsedTime(listenerElapsedTime / 1000000)
125 .setListenerOk(allListeners)
126 .setProducerOk(allProducersOk)
127 .setProducerError(allProducersError)
128 .setProducerRate((allProducersOk + allProducersError) * 1000000000 / producerElapsedTime)
129 .setListenerRate(allListeners * 1000000000 / listenerElapsedTime)
131 return RpcResultBuilder.success(output).buildFuture();
133 for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
134 listenerRegistration.close();
140 public ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
141 // TODO Auto-generated method stub