X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=benchmark%2Frpcbenchmark%2Fsrc%2Fmain%2Fjava%2Frpcbenchmark%2Fimpl%2FRpcbenchmarkProvider.java;h=c785e9b384134eb023833364961ecc68566e1eca;hb=HEAD;hp=14358eac0e174000d56109dc0c9ee6f85f67a8fc;hpb=f1df4d4b606bd729b9f13b7ed19e35203708645d;p=controller.git diff --git a/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java b/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java index 14358eac0e..c785e9b384 100644 --- a/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java +++ b/benchmark/rpcbenchmark/src/main/java/rpcbenchmark/impl/RpcbenchmarkProvider.java @@ -5,125 +5,127 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package rpcbenchmark.impl; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; -import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; -import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.NodeContext; -import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchPayloadService; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.mdsal.binding.api.RpcProviderService; +import org.opendaylight.mdsal.binding.api.RpcService; import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchRpcRoutes; import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRoute; import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRouteKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.RpcbenchmarkService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTest; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput.ExecStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutputBuilder; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.Uint32; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.RequireServiceComponentRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RpcbenchmarkProvider implements BindingAwareProvider, AutoCloseable, RpcbenchmarkService { - +@Singleton +@Component(service = {}) +@RequireServiceComponentRuntime +public final class RpcbenchmarkProvider implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RpcbenchmarkProvider.class); - private static final GlobalBindingRTCServer gServer = new GlobalBindingRTCServer(); - private static final int testTimeout = 5; - private final AtomicReference execStatus = new AtomicReference<>(ExecStatus.Idle); - private RpcConsumerRegistry consumerRegistry; - private RpcProviderRegistry providerRegistry; + private static final int TEST_TIMEOUT = 5; - @Override - public void onSessionInitiated(ProviderContext session) { - LOG.info("RpcbenchmarkProvider Session Initiated"); - consumerRegistry = session.getSALService(RpcConsumerRegistry.class); - providerRegistry = session.getSALService(RpcProviderRegistry.class); - - // Register the benchmark Global RPC - session.addRpcImplementation(RpcbenchPayloadService.class, gServer); - // Register RPC Benchmark's control REST API - session.addRpcImplementation(RpcbenchmarkService.class, this); + private final AtomicReference execStatus = new AtomicReference<>(ExecStatus.Idle); + private final RpcProviderService providerRegistry; + private final RpcService consumerRegistry; + private final GlobalBindingRTCServer globalServer; + private final Registration reg; + + @Inject + @Activate + public RpcbenchmarkProvider(@Reference final RpcProviderService providerRegistry, + @Reference final RpcService consumerRegistry) { + this.providerRegistry = requireNonNull(providerRegistry); + this.consumerRegistry = requireNonNull(consumerRegistry); + globalServer = new GlobalBindingRTCServer(providerRegistry); + reg = providerRegistry.registerRpcImplementations((TestStatus) this::testStatus, (StartTest) this::startTest); + LOG.info("RpcbenchmarkProvider initiated"); } @Override - public void close() throws Exception { - LOG.info("RpcbenchmarkProvider Closed"); + @Deactivate + @PreDestroy + public void close() { + globalServer.close(); + reg.close(); + LOG.info("RpcbenchmarkProvider closed"); } - @Override - public Future> startTest(final StartTestInput input) { - LOG.info("startTest {}", input); + private ListenableFuture> startTest(final StartTestInput input) { + LOG.debug("startTest {}", input); final RTCClient client; - final List> rpcRegs = new ArrayList<>(); + RoutedBindingRTCServer routed = null; switch (input.getOperation()) { - case ROUTEDRTC: - List> routeIid = new ArrayList<>(); - for (int i = 0; i < input.getNumServers().intValue(); i++) { - GlobalBindingRTCServer server = new GlobalBindingRTCServer(); - RoutedRpcRegistration routedReg = - providerRegistry.addRoutedRpcImplementation(RpcbenchPayloadService.class, server); - - KeyedInstanceIdentifier iid = - InstanceIdentifier - .create(RpcbenchRpcRoutes.class) - .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i))); - routeIid.add(iid); - routedReg.registerPath(NodeContext.class, iid); - rpcRegs.add(routedReg); - } + case ROUTEDRTC: + List> routeIid = new ArrayList<>(); + for (int i = 0; i < input.getNumServers().intValue(); i++) { + routeIid.add(InstanceIdentifier.create(RpcbenchRpcRoutes.class) + .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i)))); + } - client = new RoutedBindingRTClient(consumerRegistry, input.getPayloadSize().intValue(), routeIid); - break; + routed = new RoutedBindingRTCServer(providerRegistry, Set.copyOf(routeIid)); + client = new RoutedBindingRTClient(consumerRegistry, input.getPayloadSize().intValue(), routeIid); + break; - case GLOBALRTC: - client = new GlobalBindingRTCClient(consumerRegistry, input.getPayloadSize().intValue()); - break; + case GLOBALRTC: + client = new GlobalBindingRTCClient(consumerRegistry, input.getPayloadSize().intValue()); + break; - default: - LOG.error("Unsupported server/client type {}", input.getOperation()); - throw new IllegalArgumentException("Unsupported server/client type" + input.getOperation()); + default: + LOG.error("Unsupported server/client type {}", input.getOperation()); + throw new IllegalArgumentException("Unsupported server/client type" + input.getOperation()); } try { ExecutorService executor = Executors.newFixedThreadPool(input.getNumClients().intValue()); - final Runnable testRun = new Runnable() { - @Override - public void run() { - client.runTest(input.getIterations().intValue()); - } - }; + final Runnable testRun = () -> client.runTest(input.getIterations().intValue()); LOG.info("Test Started"); - long startTime = System.nanoTime(); + final long startTime = System.nanoTime(); - for (int i = 0; i < input.getNumClients().intValue(); i++ ) { - executor.submit(testRun); + for (int i = 0; i < input.getNumClients().intValue(); i++) { + // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE, but we should check more + verifyNotNull(executor.submit(testRun)); } executor.shutdown(); try { - executor.awaitTermination(testTimeout, TimeUnit.MINUTES); - } catch (InterruptedException e) { - LOG.error("Out of time: test did not finish within the {} min deadline ", testTimeout); + executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES); + } catch (final InterruptedException e) { + LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT); } long endTime = System.nanoTime(); @@ -132,25 +134,25 @@ public class RpcbenchmarkProvider implements BindingAwareProvider, AutoCloseable long elapsedTime = endTime - startTime; StartTestOutput output = new StartTestOutputBuilder() - .setRate((long)0) - .setGlobalRtcClientError(client.getRpcError()) - .setGlobalRtcClientOk(client.getRpcOk()) - .setExecTime(TimeUnit.NANOSECONDS.toMillis(elapsedTime)) - .setRate(((client.getRpcOk() + client.getRpcError()) * 1000000000) / elapsedTime) + .setRate(Uint32.ZERO) + .setGlobalRtcClientError(Uint32.valueOf(client.getRpcError())) + .setGlobalRtcClientOk(Uint32.valueOf(client.getRpcOk())) + .setExecTime(Uint32.valueOf(TimeUnit.NANOSECONDS.toMillis(elapsedTime))) + .setRate(Uint32.valueOf( + (client.getRpcOk() + client.getRpcError()) * 1000000000 / elapsedTime)) .build(); return RpcResultBuilder.success(output).buildFuture(); } finally { - for (RoutedRpcRegistration routedRpcRegistration : rpcRegs) { - routedRpcRegistration.close(); + if (routed != null) { + routed.close(); } } } - @Override - public Future> testStatus() { + private ListenableFuture> testStatus(final TestStatusInput input) { LOG.info("testStatus"); TestStatusOutput output = new TestStatusOutputBuilder() - .setGlobalServerCnt((long)gServer.getNumRpcs()) + .setGlobalServerCnt(Uint32.valueOf(globalServer.getNumRpcs())) .setExecStatus(execStatus.get()) .build(); return RpcResultBuilder.success(output).buildFuture();