2 * Copyright (c) 2015 Cisco Systems Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package ntfbenchmark.impl;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
20 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
21 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.NtfbenchmarkService;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.yang.common.RpcResult;
31 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 public class NtfbenchmarkProvider implements BindingAwareProvider, AutoCloseable, NtfbenchmarkService {
37 private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
38 private NotificationService listenService;
39 private NotificationPublishService publishService;
40 private static final int testTimeout = 5;
42 public NtfbenchmarkProvider(NotificationService listenServiceDependency,
43 NotificationPublishService publishServiceDependency) {
44 LOG.debug("NtfbenchmarkProvider Constructor");
45 listenService = listenServiceDependency;
46 publishService = publishServiceDependency;
50 public void onSessionInitiated(final ProviderContext session) {
51 LOG.debug("NtfbenchmarkProvider Session Initiated");
52 session.addRpcImplementation(NtfbenchmarkService.class, this);
56 public void close() throws Exception {
57 LOG.debug("NtfbenchmarkProvider Closed");
61 public Future<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
62 final int producerCount = input.getProducers().intValue();
63 final int listenerCount = input.getListeners().intValue();
64 final int iterations = input.getIterations().intValue();
65 final int payloadSize = input.getIterations().intValue();
67 final List<AbstractNtfbenchProducer> producers = new ArrayList<>(producerCount);
68 final List<ListenerRegistration<NtfbenchTestListener>> listeners = new ArrayList<>(listenerCount);
69 for (int i = 0; i < producerCount; i++) {
70 producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
72 int expectedCntPerListener = producerCount * iterations;
74 for (int i = 0; i < listenerCount; i++) {
75 final NtfbenchTestListener listener;
76 if (input.getProducerType() == ProducerType.BLOCKING) {
77 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
79 listener = new NtfbenchTestListener(payloadSize);
81 listeners.add(listenService.registerNotificationListener(listener));
85 final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
87 LOG.info("Test Started");
88 final long startTime = System.nanoTime();
90 for (int i = 0; i < input.getProducers().intValue(); i++) {
91 executor.submit(producers.get(i));
95 executor.awaitTermination(testTimeout, TimeUnit.MINUTES);
96 for (ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
97 listenerRegistration.getInstance().getAllDone().get();
99 } catch (final InterruptedException | ExecutionException e) {
100 LOG.error("Out of time: test did not finish within the {} min deadline ", testTimeout);
103 final long producerEndTime = System.nanoTime();
104 final long producerElapsedTime = producerEndTime - startTime;
106 long allListeners = 0;
107 long allProducersOk = 0;
108 long allProducersError = 0;
110 for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
111 allListeners += listenerRegistration.getInstance().getReceived();
114 final long listenerEndTime = System.nanoTime();
115 final long listenerElapsedTime = producerEndTime - startTime;
117 LOG.info("Test Done");
119 for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
120 allProducersOk += abstractNtfbenchProducer.getNtfOk();
121 allProducersError += abstractNtfbenchProducer.getNtfError();
124 final StartTestOutput output =
125 new StartTestOutputBuilder()
126 .setProducerElapsedTime(producerElapsedTime / 1000000)
127 .setListenerElapsedTime(listenerElapsedTime / 1000000)
128 .setListenerOk(allListeners)
129 .setProducerOk(allProducersOk)
130 .setProducerError(allProducersError)
131 .setProducerRate(((allProducersOk + allProducersError) * 1000000000) / producerElapsedTime)
132 .setListenerRate((allListeners * 1000000000) / listenerElapsedTime)
134 return RpcResultBuilder.success(output).buildFuture();
136 for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
137 listenerRegistration.close();
143 public Future<RpcResult<TestStatusOutput>> testStatus() {
144 // TODO Auto-generated method stub