X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=benchmark%2Frpcbenchmark%2Fsrc%2Fmain%2Fjava%2Frpcbenchmark%2Fimpl%2FRpcbenchmarkProvider.java;h=ddcf2547007a095b46a900e11ffe4977746dcd9d;hb=refs%2Fchanges%2F64%2F84164%2F1;hp=e06e3c02f8fa5cd0a288f5cd065f0cccadddf2e5;hpb=2bf2e52f7001e81608c8219f35a1bf4f637aa5f1;p=controller.git diff --git a/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java b/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java index e06e3c02f8..ddcf254700 100644 --- a/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java +++ b/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java @@ -5,21 +5,18 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package rpcbenchmark.impl; +import static com.google.common.base.Verify.verifyNotNull; + +import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; -import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; -import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.NodeContext; import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchPayloadService; @@ -30,6 +27,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbench import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput.ExecStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutputBuilder; @@ -40,90 +38,83 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RpcbenchmarkProvider implements BindingAwareProvider, AutoCloseable, RpcbenchmarkService { +public class RpcbenchmarkProvider implements AutoCloseable, RpcbenchmarkService { private static final Logger LOG = LoggerFactory.getLogger(RpcbenchmarkProvider.class); - private static final GlobalBindingRTCServer gServer = new GlobalBindingRTCServer(); - private static final int testTimeout = 5; - private final AtomicReference execStatus = new AtomicReference(ExecStatus.Idle ); - private RpcConsumerRegistry consumerRegistry; - private RpcProviderRegistry providerRegistry; + private static final int TEST_TIMEOUT = 5; - @Override - public void onSessionInitiated(ProviderContext session) { - LOG.info("RpcbenchmarkProvider Session Initiated"); - consumerRegistry = session.getSALService(RpcConsumerRegistry.class); - providerRegistry = session.getSALService(RpcProviderRegistry.class); - - // Register the benchmark Global RPC - session.addRpcImplementation(RpcbenchPayloadService.class, gServer); - // Register RPC Benchmark's control REST API - session.addRpcImplementation(RpcbenchmarkService.class, this); + private final GlobalBindingRTCServer globalServer; + private final AtomicReference execStatus = new AtomicReference<>(ExecStatus.Idle); + private final RpcProviderRegistry providerRegistry; + + public RpcbenchmarkProvider(final RpcProviderRegistry providerRegistry, final GlobalBindingRTCServer globalServer) { + this.providerRegistry = providerRegistry; + this.globalServer = globalServer; + } + + public void init() { + LOG.info("RpcbenchmarkProvider initiated"); } @Override - public void close() throws Exception { - LOG.info("RpcbenchmarkProvider Closed"); + public void close() { + LOG.info("RpcbenchmarkProvider closed"); } @Override - public Future> startTest(final StartTestInput input) { - LOG.info("startTest {}", input); + public ListenableFuture> startTest(final StartTestInput input) { + LOG.debug("startTest {}", input); final RTCClient client; final List> rpcRegs = new ArrayList<>(); switch (input.getOperation()) { - case ROUTEDRTC: - List> routeIid = new ArrayList<>(); - for (int i = 0; i < input.getNumServers().intValue(); i++) { - GlobalBindingRTCServer server = new GlobalBindingRTCServer(); - RoutedRpcRegistration routedReg = - providerRegistry.addRoutedRpcImplementation(RpcbenchPayloadService.class, server); - - KeyedInstanceIdentifier iid = - InstanceIdentifier + case ROUTEDRTC: + List> routeIid = new ArrayList<>(); + for (int i = 0; i < input.getNumServers().intValue(); i++) { + GlobalBindingRTCServer server = new GlobalBindingRTCServer(); + RoutedRpcRegistration routedReg = + providerRegistry.addRoutedRpcImplementation(RpcbenchPayloadService.class, server); + + KeyedInstanceIdentifier iid = + InstanceIdentifier .create(RpcbenchRpcRoutes.class) .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i))); - routeIid.add(iid); - routedReg.registerPath(NodeContext.class, iid); - rpcRegs.add(routedReg); - } + routeIid.add(iid); + routedReg.registerPath(NodeContext.class, iid); + rpcRegs.add(routedReg); + } - client = new RoutedBindingRTClient(consumerRegistry, input.getPayloadSize().intValue(), routeIid); - break; + client = new RoutedBindingRTClient(providerRegistry, input.getPayloadSize().intValue(), routeIid); + break; - case GLOBALRTC: - client = new GlobalBindingRTCClient(consumerRegistry, input.getPayloadSize().intValue()); - break; + case GLOBALRTC: + client = new GlobalBindingRTCClient(providerRegistry, input.getPayloadSize().intValue()); + break; - default: - LOG.error("Unsupported server/client type {}", input.getOperation()); - throw new IllegalArgumentException("Unsupported server/client type" + input.getOperation()); + default: + LOG.error("Unsupported server/client type {}", input.getOperation()); + throw new IllegalArgumentException("Unsupported server/client type" + input.getOperation()); } try { ExecutorService executor = Executors.newFixedThreadPool(input.getNumClients().intValue()); - final Runnable testRun = new Runnable() { - @Override - public void run() { - client.runTest(input.getIterations().intValue()); - } - }; + final Runnable testRun = () -> client.runTest(input.getIterations().intValue()); LOG.info("Test Started"); - long startTime = System.nanoTime(); + final long startTime = System.nanoTime(); - for (int i = 0; i < input.getNumClients().intValue(); i++ ) { - executor.submit(testRun); + for (int i = 0; i < input.getNumClients().intValue(); i++) { + // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE, but we should check more + verifyNotNull(executor.submit(testRun)); } executor.shutdown(); try { - executor.awaitTermination(testTimeout, TimeUnit.MINUTES); - } catch (InterruptedException e) { - LOG.error("Out of time: test did not finish within the {} min deadline ", testTimeout); + executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES); + } catch (final InterruptedException e) { + LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT); } long endTime = System.nanoTime(); @@ -133,10 +124,11 @@ public class RpcbenchmarkProvider implements BindingAwareProvider, AutoCloseable StartTestOutput output = new StartTestOutputBuilder() .setRate((long)0) - .setGlobalRtcClientError((long)client.getRpcError()) - .setGlobalRtcClientOk((long)client.getRpcOk()) + .setGlobalRtcClientError(client.getRpcError()) + .setGlobalRtcClientOk(client.getRpcOk()) .setExecTime(TimeUnit.NANOSECONDS.toMillis(elapsedTime)) - .setRate(((client.getRpcOk() + client.getRpcError()) * 1000000000) / elapsedTime) + .setRate( + (client.getRpcOk() + client.getRpcError()) * 1000000000 / elapsedTime) .build(); return RpcResultBuilder.success(output).buildFuture(); } finally { @@ -147,10 +139,10 @@ public class RpcbenchmarkProvider implements BindingAwareProvider, AutoCloseable } @Override - public Future> testStatus() { + public ListenableFuture> testStatus(final TestStatusInput input) { LOG.info("testStatus"); TestStatusOutput output = new TestStatusOutputBuilder() - .setGlobalServerCnt((long)gServer.getNumRpcs()) + .setGlobalServerCnt((long)globalServer.getNumRpcs()) .setExecStatus(execStatus.get()) .build(); return RpcResultBuilder.success(output).buildFuture();