Adjust for Binding RPC codegen changes
[controller.git] / benchmark / ntfbenchmark / src / main / java / ntfbenchmark / impl / NtfbenchmarkProvider.java
1 /*
2  * Copyright (c) 2015 Cisco Systems Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package ntfbenchmark.impl;
9
10 import com.google.common.util.concurrent.ListenableFuture;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.TimeUnit;
17 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
18 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.NtfbenchmarkService;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput;
26 import org.opendaylight.yangtools.concepts.ListenerRegistration;
27 import org.opendaylight.yangtools.yang.common.RpcResult;
28 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public class NtfbenchmarkProvider implements AutoCloseable, NtfbenchmarkService {
33
34     private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class);
35     private final NotificationService listenService;
36     private final NotificationPublishService publishService;
37     private static final int testTimeout = 5;
38
39     public NtfbenchmarkProvider(final NotificationService listenServiceDependency,
40             final NotificationPublishService publishServiceDependency) {
41         LOG.debug("NtfbenchmarkProvider Constructor");
42         listenService = listenServiceDependency;
43         publishService = publishServiceDependency;
44     }
45
46     public void init() {
47         LOG.info("NtfbenchmarkProvider initiated");
48     }
49
50     @Override
51     public void close() throws Exception {
52         LOG.info("NtfbenchmarkProvider closed");
53     }
54
55     @Override
56     public ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
57         final int producerCount = input.getProducers().intValue();
58         final int listenerCount = input.getListeners().intValue();
59         final int iterations = input.getIterations().intValue();
60         final int payloadSize = input.getIterations().intValue();
61
62         final List<AbstractNtfbenchProducer> producers = new ArrayList<>(producerCount);
63         final List<ListenerRegistration<NtfbenchTestListener>> listeners = new ArrayList<>(listenerCount);
64         for (int i = 0; i < producerCount; i++) {
65             producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize));
66         }
67         int expectedCntPerListener = producerCount * iterations;
68
69         for (int i = 0; i < listenerCount; i++) {
70             final NtfbenchTestListener listener;
71             if (input.getProducerType() == ProducerType.BLOCKING) {
72                 listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener);
73             } else {
74                 listener = new NtfbenchTestListener(payloadSize);
75             }
76             listeners.add(listenService.registerNotificationListener(listener));
77         }
78
79         try {
80             final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue());
81
82             LOG.info("Test Started");
83             final long startTime = System.nanoTime();
84
85             for (int i = 0; i < input.getProducers().intValue(); i++) {
86                 executor.submit(producers.get(i));
87             }
88             executor.shutdown();
89             try {
90                 executor.awaitTermination(testTimeout, TimeUnit.MINUTES);
91                 for (ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
92                     listenerRegistration.getInstance().getAllDone().get();
93                 }
94             } catch (final InterruptedException | ExecutionException e) {
95                 LOG.error("Out of time: test did not finish within the {} min deadline ", testTimeout);
96             }
97
98             final long producerEndTime = System.nanoTime();
99             final long producerElapsedTime = producerEndTime - startTime;
100
101             long allListeners = 0;
102             long allProducersOk = 0;
103             long allProducersError = 0;
104
105             for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
106                 allListeners += listenerRegistration.getInstance().getReceived();
107             }
108
109             final long listenerEndTime = System.nanoTime();
110             final long listenerElapsedTime = producerEndTime - startTime;
111
112             LOG.info("Test Done");
113
114             for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) {
115                 allProducersOk += abstractNtfbenchProducer.getNtfOk();
116                 allProducersError += abstractNtfbenchProducer.getNtfError();
117             }
118
119             final StartTestOutput output =
120                     new StartTestOutputBuilder()
121                             .setProducerElapsedTime(producerElapsedTime / 1000000)
122                             .setListenerElapsedTime(listenerElapsedTime / 1000000)
123                             .setListenerOk(allListeners)
124                             .setProducerOk(allProducersOk)
125                             .setProducerError(allProducersError)
126                             .setProducerRate((allProducersOk + allProducersError) * 1000000000 / producerElapsedTime)
127                             .setListenerRate(allListeners * 1000000000 / listenerElapsedTime)
128                            .build();
129             return RpcResultBuilder.success(output).buildFuture();
130         } finally {
131             for (final ListenerRegistration<NtfbenchTestListener> listenerRegistration : listeners) {
132                 listenerRegistration.close();
133             }
134         }
135     }
136
137     @Override
138     public ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
139         // TODO Auto-generated method stub
140         return null;
141     }
142
143 }