Bump versions 9.0.4-SNAPSHOT
[controller.git] / benchmark / rpcbenchmark / src / main / java / rpcbenchmark / impl / RpcbenchmarkProvider.java
index a0cc1a16dc1b829d543d1699f96a3d0059bd688d..c785e9b384134eb023833364961ecc68566e1eca 100644 (file)
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package rpcbenchmark.impl;
 
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.NodeContext;
-import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchPayloadService;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.api.RpcService;
 import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchRpcRoutes;
 import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRoute;
 import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRouteKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.RpcbenchmarkService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTest;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput.ExecStatus;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutputBuilder;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.RequireServiceComponentRuntime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RpcbenchmarkProvider implements BindingAwareProvider, AutoCloseable, RpcbenchmarkService {
-
+@Singleton
+@Component(service = {})
+@RequireServiceComponentRuntime
+public final class RpcbenchmarkProvider implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(RpcbenchmarkProvider.class);
-    private static final GlobalBindingRTCServer gServer = new GlobalBindingRTCServer();
-    private static final int testTimeout = 5;
-    private final AtomicReference<ExecStatus> execStatus = new AtomicReference<>(ExecStatus.Idle);
-    private RpcConsumerRegistry consumerRegistry;
-    private RpcProviderRegistry providerRegistry;
+    private static final int TEST_TIMEOUT = 5;
 
-    @Override
-    public void onSessionInitiated(final ProviderContext session) {
-        LOG.debug("RpcbenchmarkProvider Session Initiated");
-        consumerRegistry = session.getSALService(RpcConsumerRegistry.class);
-        providerRegistry = session.getSALService(RpcProviderRegistry.class);
-
-        // Register the benchmark Global RPC
-        session.addRpcImplementation(RpcbenchPayloadService.class, gServer);
-        // Register RPC Benchmark's control REST API
-        session.addRpcImplementation(RpcbenchmarkService.class, this);
+    private final AtomicReference<ExecStatus> execStatus = new AtomicReference<>(ExecStatus.Idle);
+    private final RpcProviderService providerRegistry;
+    private final RpcService consumerRegistry;
+    private final GlobalBindingRTCServer globalServer;
+    private final Registration reg;
+
+    @Inject
+    @Activate
+    public RpcbenchmarkProvider(@Reference final RpcProviderService providerRegistry,
+            @Reference final RpcService consumerRegistry) {
+        this.providerRegistry = requireNonNull(providerRegistry);
+        this.consumerRegistry = requireNonNull(consumerRegistry);
+        globalServer = new GlobalBindingRTCServer(providerRegistry);
+        reg = providerRegistry.registerRpcImplementations((TestStatus) this::testStatus, (StartTest) this::startTest);
+        LOG.info("RpcbenchmarkProvider initiated");
     }
 
     @Override
-    public void close() throws Exception {
-        LOG.debug("RpcbenchmarkProvider Closed");
+    @Deactivate
+    @PreDestroy
+    public void close() {
+        globalServer.close();
+        reg.close();
+        LOG.info("RpcbenchmarkProvider closed");
     }
 
-    @Override
-    public Future<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
+    private ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
         LOG.debug("startTest {}", input);
 
         final RTCClient client;
-        final List<RoutedRpcRegistration<?>> rpcRegs = new ArrayList<>();
+        RoutedBindingRTCServer routed = null;
 
         switch (input.getOperation()) {
-        case ROUTEDRTC:
-            List<InstanceIdentifier<?>> routeIid = new ArrayList<>();
-            for (int i = 0; i < input.getNumServers().intValue(); i++) {
-                GlobalBindingRTCServer server = new GlobalBindingRTCServer();
-                RoutedRpcRegistration<RpcbenchPayloadService> routedReg =
-                        providerRegistry.addRoutedRpcImplementation(RpcbenchPayloadService.class, server);
-
-                KeyedInstanceIdentifier<RpcRoute, RpcRouteKey> iid =
-                        InstanceIdentifier
-                            .create(RpcbenchRpcRoutes.class)
-                            .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i)));
-                routeIid.add(iid);
-                routedReg.registerPath(NodeContext.class, iid);
-                rpcRegs.add(routedReg);
-            }
+            case ROUTEDRTC:
+                List<InstanceIdentifier<?>> routeIid = new ArrayList<>();
+                for (int i = 0; i < input.getNumServers().intValue(); i++) {
+                    routeIid.add(InstanceIdentifier.create(RpcbenchRpcRoutes.class)
+                        .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i))));
+                }
 
-            client = new RoutedBindingRTClient(consumerRegistry, input.getPayloadSize().intValue(), routeIid);
-            break;
+                routed = new RoutedBindingRTCServer(providerRegistry, Set.copyOf(routeIid));
+                client = new RoutedBindingRTClient(consumerRegistry, input.getPayloadSize().intValue(), routeIid);
+                break;
 
-        case GLOBALRTC:
-            client = new GlobalBindingRTCClient(consumerRegistry, input.getPayloadSize().intValue());
-            break;
+            case GLOBALRTC:
+                client = new GlobalBindingRTCClient(consumerRegistry, input.getPayloadSize().intValue());
+                break;
 
-        default:
-            LOG.error("Unsupported server/client type {}", input.getOperation());
-            throw new IllegalArgumentException("Unsupported server/client type" + input.getOperation());
+            default:
+                LOG.error("Unsupported server/client type {}", input.getOperation());
+                throw new IllegalArgumentException("Unsupported server/client type" + input.getOperation());
         }
 
         try {
             ExecutorService executor = Executors.newFixedThreadPool(input.getNumClients().intValue());
 
-            final Runnable testRun = new Runnable() {
-                @Override
-                public void run() {
-                    client.runTest(input.getIterations().intValue());
-                }
-            };
+            final Runnable testRun = () -> client.runTest(input.getIterations().intValue());
 
             LOG.info("Test Started");
-            long startTime = System.nanoTime();
+            final long startTime = System.nanoTime();
 
-            for (int i = 0; i < input.getNumClients().intValue(); i++ ) {
-                executor.submit(testRun);
+            for (int i = 0; i < input.getNumClients().intValue(); i++) {
+                // FIXME: fools RV_RETURN_VALUE_IGNORED_BAD_PRACTICE, but we should check more
+                verifyNotNull(executor.submit(testRun));
             }
 
             executor.shutdown();
             try {
-                executor.awaitTermination(testTimeout, TimeUnit.MINUTES);
+                executor.awaitTermination(TEST_TIMEOUT, TimeUnit.MINUTES);
             } catch (final InterruptedException e) {
-                LOG.error("Out of time: test did not finish within the {} min deadline ", testTimeout); 
+                LOG.error("Out of time: test did not finish within the {} min deadline ", TEST_TIMEOUT);
             }
 
             long endTime = System.nanoTime();
@@ -132,25 +134,25 @@ public class RpcbenchmarkProvider implements BindingAwareProvider, AutoCloseable
             long elapsedTime = endTime - startTime;
 
             StartTestOutput output = new StartTestOutputBuilder()
-                                            .setRate((long)0)
-                                            .setGlobalRtcClientError(client.getRpcError())
-                                            .setGlobalRtcClientOk(client.getRpcOk())
-                                            .setExecTime(TimeUnit.NANOSECONDS.toMillis(elapsedTime))
-                                            .setRate(((client.getRpcOk() + client.getRpcError()) * 1000000000) / elapsedTime)
+                                            .setRate(Uint32.ZERO)
+                                            .setGlobalRtcClientError(Uint32.valueOf(client.getRpcError()))
+                                            .setGlobalRtcClientOk(Uint32.valueOf(client.getRpcOk()))
+                                            .setExecTime(Uint32.valueOf(TimeUnit.NANOSECONDS.toMillis(elapsedTime)))
+                                            .setRate(Uint32.valueOf(
+                                                (client.getRpcOk() + client.getRpcError()) * 1000000000 / elapsedTime))
                                             .build();
             return RpcResultBuilder.success(output).buildFuture();
         } finally {
-            for (RoutedRpcRegistration<?> routedRpcRegistration : rpcRegs) {
-                routedRpcRegistration.close();
+            if (routed != null) {
+                routed.close();
             }
         }
     }
 
-    @Override
-    public Future<RpcResult<TestStatusOutput>> testStatus() {
+    private ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
         LOG.info("testStatus");
         TestStatusOutput output = new TestStatusOutputBuilder()
-                                        .setGlobalServerCnt((long)gServer.getNumRpcs())
+                                        .setGlobalServerCnt(Uint32.valueOf(globalServer.getNumRpcs()))
                                         .setExecStatus(execStatus.get())
                                         .build();
         return RpcResultBuilder.success(output).buildFuture();