2 * Copyright (c) 2015 Cisco Systems Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package ntfbenchmark.impl;
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.collect.ImmutableClassToInstanceMap;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.ArrayList;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.PreDestroy;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
24 import org.opendaylight.mdsal.binding.api.NotificationService;
25 import org.opendaylight.mdsal.binding.api.RpcProviderService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbench.payload.rev150709.Ntfbench;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTest;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatus;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
35 import org.opendaylight.yangtools.concepts.Registration;
36 import org.opendaylight.yangtools.yang.binding.Rpc;
37 import org.opendaylight.yangtools.yang.common.RpcResult;
38 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
39 import org.opendaylight.yangtools.yang.common.Uint32;
40 import org.osgi.service.component.annotations.Activate;
41 import org.osgi.service.component.annotations.Component;
42 import org.osgi.service.component.annotations.Deactivate;
43 import org.osgi.service.component.annotations.Reference;
44 import org.osgi.service.component.annotations.RequireServiceComponentRuntime;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 @Component(service = {})
50 @RequireServiceComponentRuntime
51 public final class NtfbenchmarkProvider implements AutoCloseable {
52 private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
53 private static final int TEST_TIMEOUT = 5;
55 private final NotificationService listenService;
56 private final NotificationPublishService publishService;
57 private final Registration reg;
61 public NtfbenchmarkProvider(@Reference final NotificationService listenService,
62 @Reference final NotificationPublishService publishService,
63 @Reference final RpcProviderService rpcService) {
64 this.listenService = requireNonNull(listenService);
65 this.publishService = requireNonNull(publishService);
66 reg = rpcService.registerRpcImplementations(ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
67 .put(TestStatus.class, this::testStatus)
68 .put(StartTest.class, this::startTest)
70 LOG.debug("NtfbenchmarkProvider initiated");
78 LOG.info("NtfbenchmarkProvider closed");
81 private ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
82 final int producerCount = input.getProducers().intValue();
83 final int listenerCount = input.getListeners().intValue();
84 final int iterations = input.getIterations().intValue();
85 final int payloadSize = input.getIterations().intValue();
87 final var producers = new ArrayList<AbstractNtfbenchProducer>(producerCount);
88 for (int i = 0; i < producerCount; i++) {
89 producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
91 int expectedCntPerListener = producerCount * iterations;
93 final var listeners = new ArrayList<NtfbenchTestListener>(listenerCount);
94 final var registrations = new ArrayList<Registration>(listenerCount);
95 for (int i = 0; i < listenerCount; i++) {
96 final NtfbenchTestListener listener;
97 if (input.getProducerType() == ProducerType.BLOCKING) {
98 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
100 listener = new NtfbenchTestListener(payloadSize);
102 listeners.add(listener);
103 registrations.add(listenService.registerListener(Ntfbench.class, listener));
107 final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
109 LOG.info("Test Started");
110 final long startTime = System.nanoTime();
112 for (int i = 0; i < input.getProducers().intValue(); i++) {
113 // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE for now, but we should check some more
114 verifyNotNull(executor.submit(producers.get(i)));
118 executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES);
119 for (var listener : listeners) {
120 listener.getAllDone().get();
122 } catch (final InterruptedException | ExecutionException e) {
123 LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT, e);
126 final long producerEndTime = System.nanoTime();
127 final long producerElapsedTime = producerEndTime - startTime;
129 long allListeners = 0;
130 long allProducersOk = 0;
131 long allProducersError = 0;
133 for (var listener : listeners) {
134 allListeners += listener.getReceived();
137 final long listenerElapsedTime = producerEndTime - startTime;
139 LOG.info("Test Done");
141 for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
142 allProducersOk += abstractNtfbenchProducer.getNtfOk();
143 allProducersError += abstractNtfbenchProducer.getNtfError();
146 final StartTestOutput output = new StartTestOutputBuilder()
147 .setProducerElapsedTime(Uint32.valueOf(producerElapsedTime / 1000000))
148 .setListenerElapsedTime(Uint32.valueOf(listenerElapsedTime / 1000000))
149 .setListenerOk(Uint32.valueOf(allListeners))
150 .setProducerOk(Uint32.valueOf(allProducersOk))
151 .setProducerError(Uint32.valueOf(allProducersError))
153 Uint32.valueOf((allProducersOk + allProducersError) * 1000000000 / producerElapsedTime))
154 .setListenerRate(Uint32.valueOf(allListeners * 1000000000 / listenerElapsedTime))
156 return RpcResultBuilder.success(output).buildFuture();
158 registrations.forEach(Registration::close);
162 private ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
163 throw new UnsupportedOperationException("Not implemented");