Migrate ntfbenchmark
[controller.git] / benchmark / ntfbenchmark / src / main / java / ntfbenchmark / impl / NtfbenchmarkProvider.java
1 /*
2  * Copyright (c) 2015 Cisco Systems Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package ntfbenchmark.impl;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.ImmutableClassToInstanceMap;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.ArrayList;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.PreDestroy;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
24 import org.opendaylight.mdsal.binding.api.NotificationService;
25 import org.opendaylight.mdsal.binding.api.RpcProviderService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbench.payload.rev150709.Ntfbench;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTest;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatus;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
35 import org.opendaylight.yangtools.concepts.Registration;
36 import org.opendaylight.yangtools.yang.binding.Rpc;
37 import org.opendaylight.yangtools.yang.common.RpcResult;
38 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
39 import org.opendaylight.yangtools.yang.common.Uint32;
40 import org.osgi.service.component.annotations.Activate;
41 import org.osgi.service.component.annotations.Component;
42 import org.osgi.service.component.annotations.Deactivate;
43 import org.osgi.service.component.annotations.Reference;
44 import org.osgi.service.component.annotations.RequireServiceComponentRuntime;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 @Singleton
49 @Component(service = {})
50 @RequireServiceComponentRuntime
51 public final class NtfbenchmarkProvider implements AutoCloseable {
52     private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
53     private static final int TEST_TIMEOUT = 5;
54
55     private final NotificationService listenService;
56     private final NotificationPublishService publishService;
57     private final Registration reg;
58
59     @Inject
60     @Activate
61     public NtfbenchmarkProvider(@Reference final NotificationService listenService,
62             @Reference final NotificationPublishService publishService,
63             @Reference final RpcProviderService rpcService) {
64         this.listenService = requireNonNull(listenService);
65         this.publishService = requireNonNull(publishService);
66         reg = rpcService.registerRpcImplementations(ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
67             .put(TestStatus.class, this::testStatus)
68             .put(StartTest.class, this::startTest)
69             .build());
70         LOG.debug("NtfbenchmarkProvider initiated");
71     }
72
73     @Override
74     @PreDestroy
75     @Deactivate
76     public void close() {
77         reg.close();
78         LOG.info("NtfbenchmarkProvider closed");
79     }
80
81     private ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
82         final int producerCount = input.getProducers().intValue();
83         final int listenerCount = input.getListeners().intValue();
84         final int iterations = input.getIterations().intValue();
85         final int payloadSize = input.getIterations().intValue();
86
87         final var producers = new ArrayList<AbstractNtfbenchProducer>(producerCount);
88         for (int i = 0; i < producerCount; i++) {
89             producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
90         }
91         int expectedCntPerListener = producerCount * iterations;
92
93         final var listeners = new ArrayList<NtfbenchTestListener>(listenerCount);
94         final var registrations = new ArrayList<Registration>(listenerCount);
95         for (int i = 0; i < listenerCount; i++) {
96             final NtfbenchTestListener listener;
97             if (input.getProducerType() == ProducerType.BLOCKING) {
98                 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
99             } else {
100                 listener = new NtfbenchTestListener(payloadSize);
101             }
102             listeners.add(listener);
103             registrations.add(listenService.registerListener(Ntfbench.class, listener));
104         }
105
106         try {
107             final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
108
109             LOG.info("Test Started");
110             final long startTime = System.nanoTime();
111
112             for (int i = 0; i < input.getProducers().intValue(); i++) {
113                 // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE for now, but we should check some more
114                 verifyNotNull(executor.submit(producers.get(i)));
115             }
116             executor.shutdown();
117             try {
118                 executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES);
119                 for (var listener : listeners) {
120                     listener.getAllDone().get();
121                 }
122             } catch (final InterruptedException | ExecutionException e) {
123                 LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT, e);
124             }
125
126             final long producerEndTime = System.nanoTime();
127             final long producerElapsedTime = producerEndTime - startTime;
128
129             long allListeners = 0;
130             long allProducersOk = 0;
131             long allProducersError = 0;
132
133             for (var listener : listeners) {
134                 allListeners += listener.getReceived();
135             }
136
137             final long listenerElapsedTime = producerEndTime - startTime;
138
139             LOG.info("Test Done");
140
141             for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
142                 allProducersOk += abstractNtfbenchProducer.getNtfOk();
143                 allProducersError += abstractNtfbenchProducer.getNtfError();
144             }
145
146             final StartTestOutput output = new StartTestOutputBuilder()
147                 .setProducerElapsedTime(Uint32.valueOf(producerElapsedTime / 1000000))
148                 .setListenerElapsedTime(Uint32.valueOf(listenerElapsedTime / 1000000))
149                 .setListenerOk(Uint32.valueOf(allListeners))
150                 .setProducerOk(Uint32.valueOf(allProducersOk))
151                 .setProducerError(Uint32.valueOf(allProducersError))
152                 .setProducerRate(
153                     Uint32.valueOf((allProducersOk + allProducersError) * 1000000000 / producerElapsedTime))
154                 .setListenerRate(Uint32.valueOf(allListeners * 1000000000 / listenerElapsedTime))
155                 .build();
156             return RpcResultBuilder.success(output).buildFuture();
157         } finally {
158             registrations.forEach(Registration::close);
159         }
160     }
161
162     private ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
163         throw new UnsupportedOperationException("Not implemented");
164     }
165 }