Downgrade most info messages in benchmarks
[controller.git] / benchmark / ntfbenchmark / src / main / java / ntfbenchmark / impl / NtfbenchmarkProvider.java
1 /*
2  * Copyright (c) 2015 Cisco Systems Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package ntfbenchmark.impl;
10
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.TimeUnit;
18
19 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
20 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
21 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.NtfbenchmarkService;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.yang.common.RpcResult;
31 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class NtfbenchmarkProvider implements BindingAwareProvider, AutoCloseable, NtfbenchmarkService {
36
37     private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
38     private NotificationService listenService;
39     private NotificationPublishService publishService;
40     private static final int testTimeout = 5;
41
42     public NtfbenchmarkProvider(NotificationService listenServiceDependency,
43             NotificationPublishService publishServiceDependency) {
44         LOG.debug("NtfbenchmarkProvider Constructor");
45         listenService = listenServiceDependency;
46         publishService = publishServiceDependency;
47     }
48
49     @Override
50     public void onSessionInitiated(final ProviderContext session) {
51         LOG.debug("NtfbenchmarkProvider Session Initiated");
52         session.addRpcImplementation(NtfbenchmarkService.class, this);
53     }
54
55     @Override
56     public void close() throws Exception {
57         LOG.debug("NtfbenchmarkProvider Closed");
58     }
59
60     @Override
61     public Future<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
62         final int producerCount = input.getProducers().intValue();
63         final int listenerCount = input.getListeners().intValue();
64         final int iterations = input.getIterations().intValue();
65         final int payloadSize = input.getIterations().intValue();
66
67         final List<AbstractNtfbenchProducer> producers = new ArrayList<>(producerCount);
68         final List<ListenerRegistration<NtfbenchTestListener>> listeners = new ArrayList<>(listenerCount);
69         for (int i = 0; i < producerCount; i++) {
70             producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
71         }
72         int expectedCntPerListener = producerCount * iterations;
73
74         for (int i = 0; i < listenerCount; i++) {
75             final NtfbenchTestListener listener;
76             if (input.getProducerType() == ProducerType.BLOCKING) {
77                 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
78             } else {
79                 listener = new NtfbenchTestListener(payloadSize);
80             }
81             listeners.add(listenService.registerNotificationListener(listener));
82         }
83
84         try {
85             final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
86
87             LOG.info("Test Started");
88             final long startTime = System.nanoTime();
89
90             for (int i = 0; i < input.getProducers().intValue(); i++) {
91                 executor.submit(producers.get(i));
92             }
93             executor.shutdown();
94             try {
95                 executor.awaitTermination(testTimeout, TimeUnit.MINUTES);
96                 for (ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
97                     listenerRegistration.getInstance().getAllDone().get();
98                 }
99             } catch (final InterruptedException | ExecutionException e) {
100                 LOG.error("Out of time: test did not finish within the {} min deadline ", testTimeout);
101             }
102
103             final long producerEndTime = System.nanoTime();
104             final long producerElapsedTime = producerEndTime - startTime;
105
106             long allListeners = 0;
107             long allProducersOk = 0;
108             long allProducersError = 0;
109
110             for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
111                 allListeners += listenerRegistration.getInstance().getReceived();
112             }
113
114             final long listenerEndTime = System.nanoTime();
115             final long listenerElapsedTime = producerEndTime - startTime;
116
117             LOG.info("Test Done");
118
119             for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
120                 allProducersOk += abstractNtfbenchProducer.getNtfOk();
121                 allProducersError += abstractNtfbenchProducer.getNtfError();
122             }
123
124             final StartTestOutput output =
125                     new StartTestOutputBuilder()
126                             .setProducerElapsedTime(producerElapsedTime / 1000000)
127                             .setListenerElapsedTime(listenerElapsedTime / 1000000)
128                             .setListenerOk(allListeners)
129                             .setProducerOk(allProducersOk)
130                             .setProducerError(allProducersError)
131                             .setProducerRate(((allProducersOk + allProducersError) * 1000000000) / producerElapsedTime)
132                             .setListenerRate((allListeners * 1000000000) / listenerElapsedTime)
133                            .build();
134             return RpcResultBuilder.success(output).buildFuture();
135         } finally {
136             for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
137                 listenerRegistration.close();
138             }
139         }
140     }
141
142     @Override
143     public Future<RpcResult<TestStatusOutput>> testStatus() {
144         // TODO Auto-generated method stub
145         return null;
146     }
147
148 }