X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=benchmark%2Frpcbenchmark%2Fsrc%2Fmain%2Fjava%2Frpcbenchmark%2Fimpl%2FRpcbenchmarkProvider.java;h=c785e9b384134eb023833364961ecc68566e1eca;hb=1c79e662c75f327e4d1953710cc78b70780ef4e1;hp=8e8e31e97527ef09cb1749cb6b82cb0c64ec9df9;hpb=258d8039ac144aeee2efa7943228c0fc6cdaf651;p=controller.git diff --git a/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java b/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java index 8e8e31e975..c785e9b384 100644 --- a/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java +++ b/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java @@ -7,92 +7,105 @@ */ package rpcbenchmark.impl; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.NodeContext; -import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchPayloadService; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.mdsal.binding.api.RpcService; import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchRpcRoutes; import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRoute; import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRouteKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.RpcbenchmarkService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTest; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput.ExecStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutputBuilder; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.Uint32; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.RequireServiceComponentRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RpcbenchmarkProvider implements AutoCloseable, RpcbenchmarkService { - +@Singleton +@Component(service = {}) +@RequireServiceComponentRuntime +public final class RpcbenchmarkProvider implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RpcbenchmarkProvider.class); - private static final int testTimeout = 5; + private static final int TEST_TIMEOUT = 5; - private final GlobalBindingRTCServer globalServer; private final AtomicReference execStatus = new AtomicReference<>(ExecStatus.Idle); - private final RpcProviderRegistry providerRegistry; - - public RpcbenchmarkProvider(final RpcProviderRegistry providerRegistry, final GlobalBindingRTCServer globalServer) { - this.providerRegistry = providerRegistry; - this.globalServer = globalServer; - } - - public void init() { + private final RpcProviderService providerRegistry; + private final RpcService consumerRegistry; + private final GlobalBindingRTCServer globalServer; + private final Registration reg; + + @Inject + @Activate + public RpcbenchmarkProvider(@Reference final RpcProviderService providerRegistry, + @Reference final RpcService consumerRegistry) { + this.providerRegistry = requireNonNull(providerRegistry); + this.consumerRegistry = requireNonNull(consumerRegistry); + globalServer = new GlobalBindingRTCServer(providerRegistry); + reg = providerRegistry.registerRpcImplementations((TestStatus) this::testStatus, (StartTest) this::startTest); LOG.info("RpcbenchmarkProvider initiated"); } @Override + @Deactivate + @PreDestroy public void close() { + globalServer.close(); + reg.close(); LOG.info("RpcbenchmarkProvider closed"); } - @Override - public ListenableFuture> startTest(final StartTestInput input) { + private ListenableFuture> startTest(final StartTestInput input) { LOG.debug("startTest {}", input); final RTCClient client; - final List> rpcRegs = new ArrayList<>(); + RoutedBindingRTCServer routed = null; switch (input.getOperation()) { - case ROUTEDRTC: - List> routeIid = new ArrayList<>(); - for (int i = 0; i < input.getNumServers().intValue(); i++) { - GlobalBindingRTCServer server = new GlobalBindingRTCServer(); - RoutedRpcRegistration routedReg = - providerRegistry.addRoutedRpcImplementation(RpcbenchPayloadService.class, server); - - KeyedInstanceIdentifier iid = - InstanceIdentifier - .create(RpcbenchRpcRoutes.class) - .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i))); - routeIid.add(iid); - routedReg.registerPath(NodeContext.class, iid); - rpcRegs.add(routedReg); - } - - client = new RoutedBindingRTClient(providerRegistry, input.getPayloadSize().intValue(), routeIid); - break; - - case GLOBALRTC: - client = new GlobalBindingRTCClient(providerRegistry, input.getPayloadSize().intValue()); - break; - - default: - LOG.error("Unsupported server/client type {}", input.getOperation()); - throw new IllegalArgumentException("Unsupported server/client type" + input.getOperation()); + case ROUTEDRTC: + List> routeIid = new ArrayList<>(); + for (int i = 0; i < input.getNumServers().intValue(); i++) { + routeIid.add(InstanceIdentifier.create(RpcbenchRpcRoutes.class) + .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i)))); + } + + routed = new RoutedBindingRTCServer(providerRegistry, Set.copyOf(routeIid)); + client = new RoutedBindingRTClient(consumerRegistry, input.getPayloadSize().intValue(), routeIid); + break; + + case GLOBALRTC: + client = new GlobalBindingRTCClient(consumerRegistry, input.getPayloadSize().intValue()); + break; + + default: + LOG.error("Unsupported server/client type {}", input.getOperation()); + throw new IllegalArgumentException("Unsupported server/client type" + input.getOperation()); } try { @@ -101,17 +114,18 @@ public class RpcbenchmarkProvider implements AutoCloseable, RpcbenchmarkService final Runnable testRun = () -> client.runTest(input.getIterations().intValue()); LOG.info("Test Started"); - long startTime = System.nanoTime(); + final long startTime = System.nanoTime(); - for (int i = 0; i < input.getNumClients().intValue(); i++ ) { - executor.submit(testRun); + for (int i = 0; i < input.getNumClients().intValue(); i++) { + // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE, but we should check more + verifyNotNull(executor.submit(testRun)); } executor.shutdown(); try { - executor.awaitTermination(testTimeout, TimeUnit.MINUTES); + executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES); } catch (final InterruptedException e) { - LOG.error("Out of time: test did not finish within the {} min deadline ", testTimeout); + LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT); } long endTime = System.nanoTime(); @@ -120,25 +134,25 @@ public class RpcbenchmarkProvider implements AutoCloseable, RpcbenchmarkService long elapsedTime = endTime - startTime; StartTestOutput output = new StartTestOutputBuilder() - .setRate((long)0) - .setGlobalRtcClientError(client.getRpcError()) - .setGlobalRtcClientOk(client.getRpcOk()) - .setExecTime(TimeUnit.NANOSECONDS.toMillis(elapsedTime)) - .setRate((client.getRpcOk() + client.getRpcError()) * 1000000000 / elapsedTime) + .setRate(Uint32.ZERO) + .setGlobalRtcClientError(Uint32.valueOf(client.getRpcError())) + .setGlobalRtcClientOk(Uint32.valueOf(client.getRpcOk())) + .setExecTime(Uint32.valueOf(TimeUnit.NANOSECONDS.toMillis(elapsedTime))) + .setRate(Uint32.valueOf( + (client.getRpcOk() + client.getRpcError()) * 1000000000 / elapsedTime)) .build(); return RpcResultBuilder.success(output).buildFuture(); } finally { - for (RoutedRpcRegistration routedRpcRegistration : rpcRegs) { - routedRpcRegistration.close(); + if (routed != null) { + routed.close(); } } } - @Override - public ListenableFuture> testStatus(final TestStatusInput input) { + private ListenableFuture> testStatus(final TestStatusInput input) { LOG.info("testStatus"); TestStatusOutput output = new TestStatusOutputBuilder() - .setGlobalServerCnt((long)globalServer.getNumRpcs()) + .setGlobalServerCnt(Uint32.valueOf(globalServer.getNumRpcs())) .setExecStatus(execStatus.get()) .build(); return RpcResultBuilder.success(output).buildFuture();