Migrate ntfbenchmark to OSGi DS
[controller.git] / benchmark / ntfbenchmark / src / main / java / ntfbenchmark / impl / NtfbenchmarkProvider.java
1 /*
2  * Copyright (c) 2015 Cisco Systems Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package ntfbenchmark.impl;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.PreDestroy;
21 import javax.inject.Inject;
22 import javax.inject.Singleton;
23 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
24 import org.opendaylight.mdsal.binding.api.NotificationService;
25 import org.opendaylight.mdsal.binding.api.RpcProviderService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.NtfbenchmarkService;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.concepts.Registration;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
37 import org.opendaylight.yangtools.yang.common.Uint32;
38 import org.osgi.service.component.annotations.Activate;
39 import org.osgi.service.component.annotations.Component;
40 import org.osgi.service.component.annotations.Deactivate;
41 import org.osgi.service.component.annotations.Reference;
42 import org.osgi.service.component.annotations.RequireServiceComponentRuntime;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 @Singleton
47 @Component(service = {})
48 @RequireServiceComponentRuntime
49 public final class NtfbenchmarkProvider implements AutoCloseable, NtfbenchmarkService {
50     private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
51     private static final int TEST_TIMEOUT = 5;
52
53     private final NotificationService listenService;
54     private final NotificationPublishService publishService;
55     private final Registration reg;
56
57     @Inject
58     @Activate
59     public NtfbenchmarkProvider(@Reference final NotificationService listenService,
60             @Reference final NotificationPublishService publishService,
61             @Reference final RpcProviderService rpcService) {
62         this.listenService = requireNonNull(listenService);
63         this.publishService = requireNonNull(publishService);
64         reg = rpcService.registerRpcImplementation(NtfbenchmarkService.class, this);
65         LOG.debug("NtfbenchmarkProvider initiated");
66     }
67
68     @Override
69     @PreDestroy
70     @Deactivate
71     public void close() {
72         reg.close();
73         LOG.info("NtfbenchmarkProvider closed");
74     }
75
76     @Override
77     public ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
78         final int producerCount = input.getProducers().intValue();
79         final int listenerCount = input.getListeners().intValue();
80         final int iterations = input.getIterations().intValue();
81         final int payloadSize = input.getIterations().intValue();
82
83         final List<AbstractNtfbenchProducer> producers = new ArrayList<>(producerCount);
84         final List<ListenerRegistration<NtfbenchTestListener>> listeners = new ArrayList<>(listenerCount);
85         for (int i = 0; i < producerCount; i++) {
86             producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
87         }
88         int expectedCntPerListener = producerCount * iterations;
89
90         for (int i = 0; i < listenerCount; i++) {
91             final NtfbenchTestListener listener;
92             if (input.getProducerType() == ProducerType.BLOCKING) {
93                 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
94             } else {
95                 listener = new NtfbenchTestListener(payloadSize);
96             }
97             listeners.add(listenService.registerNotificationListener(listener));
98         }
99
100         try {
101             final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
102
103             LOG.info("Test Started");
104             final long startTime = System.nanoTime();
105
106             for (int i = 0; i < input.getProducers().intValue(); i++) {
107                 // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE for now, but we should check some more
108                 verifyNotNull(executor.submit(producers.get(i)));
109             }
110             executor.shutdown();
111             try {
112                 executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES);
113                 for (ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
114                     listenerRegistration.getInstance().getAllDone().get();
115                 }
116             } catch (final InterruptedException | ExecutionException e) {
117                 LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT);
118             }
119
120             final long producerEndTime = System.nanoTime();
121             final long producerElapsedTime = producerEndTime - startTime;
122
123             long allListeners = 0;
124             long allProducersOk = 0;
125             long allProducersError = 0;
126
127             for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
128                 allListeners += listenerRegistration.getInstance().getReceived();
129             }
130
131             final long listenerElapsedTime = producerEndTime - startTime;
132
133             LOG.info("Test Done");
134
135             for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
136                 allProducersOk += abstractNtfbenchProducer.getNtfOk();
137                 allProducersError += abstractNtfbenchProducer.getNtfError();
138             }
139
140             final StartTestOutput output = new StartTestOutputBuilder()
141                 .setProducerElapsedTime(Uint32.valueOf(producerElapsedTime / 1000000))
142                 .setListenerElapsedTime(Uint32.valueOf(listenerElapsedTime / 1000000))
143                 .setListenerOk(Uint32.valueOf(allListeners))
144                 .setProducerOk(Uint32.valueOf(allProducersOk))
145                 .setProducerError(Uint32.valueOf(allProducersError))
146                 .setProducerRate(
147                     Uint32.valueOf((allProducersOk + allProducersError) * 1000000000 / producerElapsedTime))
148                 .setListenerRate(Uint32.valueOf(allListeners * 1000000000 / listenerElapsedTime))
149                 .build();
150             return RpcResultBuilder.success(output).buildFuture();
151         } finally {
152             for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
153                 listenerRegistration.close();
154             }
155         }
156     }
157
158     @Override
159     public ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
160         throw new UnsupportedOperationException("Not implemented");
161     }
162 }