Improve segmented journal actor metrics
[controller.git] / benchmark / ntfbenchmark / src / main / java / ntfbenchmark / impl / NtfbenchmarkProvider.java
1 /*
2  * Copyright (c) 2015 Cisco Systems Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package ntfbenchmark.impl;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.ArrayList;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.PreDestroy;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
23 import org.opendaylight.mdsal.binding.api.NotificationService;
24 import org.opendaylight.mdsal.binding.api.RpcProviderService;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbench.payload.rev150709.Ntfbench;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTest;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatus;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
34 import org.opendaylight.yangtools.concepts.Registration;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
37 import org.opendaylight.yangtools.yang.common.Uint32;
38 import org.osgi.service.component.annotations.Activate;
39 import org.osgi.service.component.annotations.Component;
40 import org.osgi.service.component.annotations.Deactivate;
41 import org.osgi.service.component.annotations.Reference;
42 import org.osgi.service.component.annotations.RequireServiceComponentRuntime;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 @Singleton
47 @Component(service = {})
48 @RequireServiceComponentRuntime
49 public final class NtfbenchmarkProvider implements AutoCloseable {
50     private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
51     private static final int TEST_TIMEOUT = 5;
52
53     private final NotificationService listenService;
54     private final NotificationPublishService publishService;
55     private final Registration reg;
56
57     @Inject
58     @Activate
59     public NtfbenchmarkProvider(@Reference final NotificationService listenService,
60             @Reference final NotificationPublishService publishService,
61             @Reference final RpcProviderService rpcService) {
62         this.listenService = requireNonNull(listenService);
63         this.publishService = requireNonNull(publishService);
64         reg = rpcService.registerRpcImplementations((TestStatus) this::testStatus, (StartTest) this::startTest);
65         LOG.debug("NtfbenchmarkProvider initiated");
66     }
67
68     @Override
69     @PreDestroy
70     @Deactivate
71     public void close() {
72         reg.close();
73         LOG.info("NtfbenchmarkProvider closed");
74     }
75
76     private ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
77         final int producerCount = input.getProducers().intValue();
78         final int listenerCount = input.getListeners().intValue();
79         final int iterations = input.getIterations().intValue();
80         final int payloadSize = input.getIterations().intValue();
81
82         final var producers = new ArrayList<AbstractNtfbenchProducer>(producerCount);
83         for (int i = 0; i < producerCount; i++) {
84             producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
85         }
86         int expectedCntPerListener = producerCount * iterations;
87
88         final var listeners = new ArrayList<NtfbenchTestListener>(listenerCount);
89         final var registrations = new ArrayList<Registration>(listenerCount);
90         for (int i = 0; i < listenerCount; i++) {
91             final NtfbenchTestListener listener;
92             if (input.getProducerType() == ProducerType.BLOCKING) {
93                 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
94             } else {
95                 listener = new NtfbenchTestListener(payloadSize);
96             }
97             listeners.add(listener);
98             registrations.add(listenService.registerListener(Ntfbench.class, listener));
99         }
100
101         try {
102             final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
103
104             LOG.info("Test Started");
105             final long startTime = System.nanoTime();
106
107             for (int i = 0; i < input.getProducers().intValue(); i++) {
108                 // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE for now, but we should check some more
109                 verifyNotNull(executor.submit(producers.get(i)));
110             }
111             executor.shutdown();
112             try {
113                 executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES);
114                 for (var listener : listeners) {
115                     listener.getAllDone().get();
116                 }
117             } catch (final InterruptedException | ExecutionException e) {
118                 LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT, e);
119             }
120
121             final long producerEndTime = System.nanoTime();
122             final long producerElapsedTime = producerEndTime - startTime;
123
124             long allListeners = 0;
125             long allProducersOk = 0;
126             long allProducersError = 0;
127
128             for (var listener : listeners) {
129                 allListeners += listener.getReceived();
130             }
131
132             final long listenerElapsedTime = producerEndTime - startTime;
133
134             LOG.info("Test Done");
135
136             for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
137                 allProducersOk += abstractNtfbenchProducer.getNtfOk();
138                 allProducersError += abstractNtfbenchProducer.getNtfError();
139             }
140
141             final StartTestOutput output = new StartTestOutputBuilder()
142                 .setProducerElapsedTime(Uint32.valueOf(producerElapsedTime / 1000000))
143                 .setListenerElapsedTime(Uint32.valueOf(listenerElapsedTime / 1000000))
144                 .setListenerOk(Uint32.valueOf(allListeners))
145                 .setProducerOk(Uint32.valueOf(allProducersOk))
146                 .setProducerError(Uint32.valueOf(allProducersError))
147                 .setProducerRate(
148                     Uint32.valueOf((allProducersOk + allProducersError) * 1000000000 / producerElapsedTime))
149                 .setListenerRate(Uint32.valueOf(allListeners * 1000000000 / listenerElapsedTime))
150                 .build();
151             return RpcResultBuilder.success(output).buildFuture();
152         } finally {
153             registrations.forEach(Registration::close);
154         }
155     }
156
157     private ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
158         throw new UnsupportedOperationException("Not implemented");
159     }
160 }