X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=benchmark%2Fntfbenchmark%2Fsrc%2Fmain%2Fjava%2Fntfbenchmark%2Fimpl%2FNtfbenchmarkProvider.java;fp=benchmark%2Fntfbenchmark%2Fsrc%2Fmain%2Fjava%2Fntfbenchmark%2Fimpl%2FNtfbenchmarkProvider.java;h=35bea7808f1303137bfd915457a77b0b71aadddb;hb=46939989aefff4b3f7cfec5c95abc8cae9c79c47;hp=0000000000000000000000000000000000000000;hpb=9120c2cf7c530f566d8af94d4f9dc2cb2d3351f7;p=controller.git diff --git a/benchmark/ntfbenchmark/src/main/java/ntfbenchmark/impl/NtfbenchmarkProvider.java b/benchmark/ntfbenchmark/src/main/java/ntfbenchmark/impl/NtfbenchmarkProvider.java new file mode 100644 index 0000000000..35bea7808f --- /dev/null +++ b/benchmark/ntfbenchmark/src/main/java/ntfbenchmark/impl/NtfbenchmarkProvider.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2015 Cisco Systems Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package ntfbenchmark.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.controller.md.sal.binding.api.NotificationService; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; +import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.NtfbenchmarkService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestInput.ProducerType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.StartTestOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ntfbenchmark.rev150105.TestStatusOutput; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NtfbenchmarkProvider implements BindingAwareProvider, AutoCloseable, NtfbenchmarkService { + + private static final Logger LOG = LoggerFactory.getLogger(NtfbenchmarkProvider.class); + private NotificationService listenService; + private NotificationPublishService publishService; + private static final int testTimeout = 5; + + public NtfbenchmarkProvider(NotificationService listenServiceDependency, + NotificationPublishService publishServiceDependency) { + LOG.info("NtfbenchmarkProvider Constructor"); + listenService = listenServiceDependency; + publishService = publishServiceDependency; + } + + @Override + public void onSessionInitiated(final ProviderContext session) { + LOG.info("NtfbenchmarkProvider Session Initiated"); + session.addRpcImplementation(NtfbenchmarkService.class, this); + } + + @Override + public void close() throws Exception { + LOG.info("NtfbenchmarkProvider Closed"); + } + + @Override + public Future> startTest(final StartTestInput input) { + final int producerCount = input.getProducers().intValue(); + final int listenerCount = input.getListeners().intValue(); + final int iterations = input.getIterations().intValue(); + final int payloadSize = input.getIterations().intValue(); + + final List producers = new ArrayList<>(producerCount); + final List> listeners = new ArrayList<>(listenerCount); + for (int i = 0; i < producerCount; i++) { + producers.add(new NtfbenchBlockingProducer(publishService, iterations, payloadSize)); + } + int expectedCntPerListener = producerCount * iterations; + + for (int i = 0; i < listenerCount; i++) { + final NtfbenchTestListener listener; + if (input.getProducerType() == ProducerType.BLOCKING) { + listener = new NtfbenchWTCListener(payloadSize, expectedCntPerListener); + } else { + listener = new NtfbenchTestListener(payloadSize); + } + listeners.add(listenService.registerNotificationListener(listener)); + } + + try { + final ExecutorService executor = Executors.newFixedThreadPool(input.getProducers().intValue()); + + LOG.info("Test Started"); + final long startTime = System.nanoTime(); + + for (int i = 0; i < input.getProducers().intValue(); i++) { + executor.submit(producers.get(i)); + } + executor.shutdown(); + try { + executor.awaitTermination(testTimeout, TimeUnit.MINUTES); + for (ListenerRegistration listenerRegistration : listeners) { + listenerRegistration.getInstance().getAllDone().get(); + } + } catch (final InterruptedException | ExecutionException e) { + LOG.error("Out of time: test did not finish within the {} min deadline ", testTimeout); + } + + final long producerEndTime = System.nanoTime(); + final long producerElapsedTime = producerEndTime - startTime; + + long allListeners = 0; + long allProducersOk = 0; + long allProducersError = 0; + + for (final ListenerRegistration listenerRegistration : listeners) { + allListeners += listenerRegistration.getInstance().getReceived(); + } + + final long listenerEndTime = System.nanoTime(); + final long listenerElapsedTime = producerEndTime - startTime; + + LOG.info("Test Done"); + + for (final AbstractNtfbenchProducer abstractNtfbenchProducer : producers) { + allProducersOk += abstractNtfbenchProducer.getNtfOk(); + allProducersError += abstractNtfbenchProducer.getNtfError(); + } + + final StartTestOutput output = + new StartTestOutputBuilder() + .setProducerElapsedTime(producerElapsedTime / 1000000) + .setListenerElapsedTime(listenerElapsedTime / 1000000) + .setListenerOk(allListeners) + .setProducerOk(allProducersOk) + .setProducerError(allProducersError) + .setProducerRate(((allProducersOk + allProducersError) * 1000000000) / producerElapsedTime) + .setListenerRate((allListeners * 1000000000) / listenerElapsedTime) + .build(); + return RpcResultBuilder.success(output).buildFuture(); + } finally { + for (final ListenerRegistration listenerRegistration : listeners) { + listenerRegistration.close(); + } + } + } + + @Override + public Future> testStatus() { + // TODO Auto-generated method stub + return null; + } + +}