2 * Copyright (c) 2015 Cisco Systems Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package ntfbenchmark.impl;
10 import static com.google.common.base.Verify.verifyNotNull;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.ArrayList;
14 import java.util.List;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
20 import org.opendaylight.mdsal.binding.api.NotificationService;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.NtfbenchmarkService;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
28 import org.opendaylight.yangtools.concepts.ListenerRegistration;
29 import org.opendaylight.yangtools.yang.common.RpcResult;
30 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
31 import org.opendaylight.yangtools.yang.common.Uint32;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 public class NtfbenchmarkProvider implements AutoCloseable, NtfbenchmarkService {
36 private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
37 private static final int TEST_TIMEOUT = 5;
39 private final NotificationService listenService;
40 private final NotificationPublishService publishService;
42 public NtfbenchmarkProvider(final NotificationService listenServiceDependency,
43 final NotificationPublishService publishServiceDependency) {
44 LOG.debug("NtfbenchmarkProvider Constructor");
45 listenService = listenServiceDependency;
46 publishService = publishServiceDependency;
50 LOG.info("NtfbenchmarkProvider initiated");
55 LOG.info("NtfbenchmarkProvider closed");
59 public ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
60 final int producerCount = input.getProducers().intValue();
61 final int listenerCount = input.getListeners().intValue();
62 final int iterations = input.getIterations().intValue();
63 final int payloadSize = input.getIterations().intValue();
65 final List<AbstractNtfbenchProducer> producers = new ArrayList<>(producerCount);
66 final List<ListenerRegistration<NtfbenchTestListener>> listeners = new ArrayList<>(listenerCount);
67 for (int i = 0; i < producerCount; i++) {
68 producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
70 int expectedCntPerListener = producerCount * iterations;
72 for (int i = 0; i < listenerCount; i++) {
73 final NtfbenchTestListener listener;
74 if (input.getProducerType() == ProducerType.BLOCKING) {
75 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
77 listener = new NtfbenchTestListener(payloadSize);
79 listeners.add(listenService.registerNotificationListener(listener));
83 final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
85 LOG.info("Test Started");
86 final long startTime = System.nanoTime();
88 for (int i = 0; i < input.getProducers().intValue(); i++) {
89 // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE for now, but we should check some more
90 verifyNotNull(executor.submit(producers.get(i)));
94 executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES);
95 for (ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
96 listenerRegistration.getInstance().getAllDone().get();
98 } catch (final InterruptedException | ExecutionException e) {
99 LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT);
102 final long producerEndTime = System.nanoTime();
103 final long producerElapsedTime = producerEndTime - startTime;
105 long allListeners = 0;
106 long allProducersOk = 0;
107 long allProducersError = 0;
109 for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
110 allListeners += listenerRegistration.getInstance().getReceived();
113 final long listenerElapsedTime = producerEndTime - startTime;
115 LOG.info("Test Done");
117 for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
118 allProducersOk += abstractNtfbenchProducer.getNtfOk();
119 allProducersError += abstractNtfbenchProducer.getNtfError();
122 final StartTestOutput output =
123 new StartTestOutputBuilder()
124 .setProducerElapsedTime(Uint32.valueOf(producerElapsedTime / 1000000))
125 .setListenerElapsedTime(Uint32.valueOf(listenerElapsedTime / 1000000))
126 .setListenerOk(Uint32.valueOf(allListeners))
127 .setProducerOk(Uint32.valueOf(allProducersOk))
128 .setProducerError(Uint32.valueOf(allProducersError))
129 .setProducerRate(Uint32.valueOf((allProducersOk + allProducersError) * 1000000000
130 / producerElapsedTime))
131 .setListenerRate(Uint32.valueOf(allListeners * 1000000000 / listenerElapsedTime))
133 return RpcResultBuilder.success(output).buildFuture();
135 for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
136 listenerRegistration.close();
142 public ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
143 // TODO Auto-generated method stub