2 * Copyright (c) 2015 Cisco Systems Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package ntfbenchmark.impl;
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.PreDestroy;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
24 import org.opendaylight.mdsal.binding.api.NotificationService;
25 import org.opendaylight.mdsal.binding.api.RpcProviderService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.NtfbenchmarkService;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.concepts.Registration;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
37 import org.opendaylight.yangtools.yang.common.Uint32;
38 import org.osgi.service.component.annotations.Activate;
39 import org.osgi.service.component.annotations.Component;
40 import org.osgi.service.component.annotations.Deactivate;
41 import org.osgi.service.component.annotations.Reference;
42 import org.osgi.service.component.annotations.RequireServiceComponentRuntime;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 @Component(service = {})
48 @RequireServiceComponentRuntime
49 public final class NtfbenchmarkProvider implements AutoCloseable, NtfbenchmarkService {
50 private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
51 private static final int TEST_TIMEOUT = 5;
53 private final NotificationService listenService;
54 private final NotificationPublishService publishService;
55 private final Registration reg;
59 public NtfbenchmarkProvider(@Reference final NotificationService listenService,
60 @Reference final NotificationPublishService publishService,
61 @Reference final RpcProviderService rpcService) {
62 this.listenService = requireNonNull(listenService);
63 this.publishService = requireNonNull(publishService);
64 reg = rpcService.registerRpcImplementation(NtfbenchmarkService.class, this);
65 LOG.debug("NtfbenchmarkProvider initiated");
73 LOG.info("NtfbenchmarkProvider closed");
77 public ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
78 final int producerCount = input.getProducers().intValue();
79 final int listenerCount = input.getListeners().intValue();
80 final int iterations = input.getIterations().intValue();
81 final int payloadSize = input.getIterations().intValue();
83 final List<AbstractNtfbenchProducer> producers = new ArrayList<>(producerCount);
84 final List<ListenerRegistration<NtfbenchTestListener>> listeners = new ArrayList<>(listenerCount);
85 for (int i = 0; i < producerCount; i++) {
86 producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
88 int expectedCntPerListener = producerCount * iterations;
90 for (int i = 0; i < listenerCount; i++) {
91 final NtfbenchTestListener listener;
92 if (input.getProducerType() == ProducerType.BLOCKING) {
93 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
95 listener = new NtfbenchTestListener(payloadSize);
97 listeners.add(listenService.registerNotificationListener(listener));
101 final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
103 LOG.info("Test Started");
104 final long startTime = System.nanoTime();
106 for (int i = 0; i < input.getProducers().intValue(); i++) {
107 // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE for now, but we should check some more
108 verifyNotNull(executor.submit(producers.get(i)));
112 executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES);
113 for (ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
114 listenerRegistration.getInstance().getAllDone().get();
116 } catch (final InterruptedException | ExecutionException e) {
117 LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT);
120 final long producerEndTime = System.nanoTime();
121 final long producerElapsedTime = producerEndTime - startTime;
123 long allListeners = 0;
124 long allProducersOk = 0;
125 long allProducersError = 0;
127 for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
128 allListeners += listenerRegistration.getInstance().getReceived();
131 final long listenerElapsedTime = producerEndTime - startTime;
133 LOG.info("Test Done");
135 for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
136 allProducersOk += abstractNtfbenchProducer.getNtfOk();
137 allProducersError += abstractNtfbenchProducer.getNtfError();
140 final StartTestOutput output = new StartTestOutputBuilder()
141 .setProducerElapsedTime(Uint32.valueOf(producerElapsedTime / 1000000))
142 .setListenerElapsedTime(Uint32.valueOf(listenerElapsedTime / 1000000))
143 .setListenerOk(Uint32.valueOf(allListeners))
144 .setProducerOk(Uint32.valueOf(allProducersOk))
145 .setProducerError(Uint32.valueOf(allProducersError))
147 Uint32.valueOf((allProducersOk + allProducersError) * 1000000000 / producerElapsedTime))
148 .setListenerRate(Uint32.valueOf(allListeners * 1000000000 / listenerElapsedTime))
150 return RpcResultBuilder.success(output).buildFuture();
152 for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
153 listenerRegistration.close();
159 public ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
160 throw new UnsupportedOperationException("Not implemented");