package rpcbenchmark.impl;
import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.NodeContext;
-import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchPayloadService;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.api.RpcService;
import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchRpcRoutes;
import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRoute;
import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRouteKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.RpcbenchmarkService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTest;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput.ExecStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutputBuilder;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.RequireServiceComponentRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RpcbenchmarkProvider implements AutoCloseable, RpcbenchmarkService {
-
+@Singleton
+@Component(service = {})
+@RequireServiceComponentRuntime
+public final class RpcbenchmarkProvider implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RpcbenchmarkProvider.class);
private static final int TEST_TIMEOUT = 5;
- private final GlobalBindingRTCServer globalServer;
private final AtomicReference<ExecStatus> execStatus = new AtomicReference<>(ExecStatus.Idle);
- private final RpcProviderRegistry providerRegistry;
-
- public RpcbenchmarkProvider(final RpcProviderRegistry providerRegistry, final GlobalBindingRTCServer globalServer) {
- this.providerRegistry = providerRegistry;
- this.globalServer = globalServer;
- }
-
- public void init() {
+ private final RpcProviderService providerRegistry;
+ private final RpcService consumerRegistry;
+ private final GlobalBindingRTCServer globalServer;
+ private final Registration reg;
+
+ @Inject
+ @Activate
+ public RpcbenchmarkProvider(@Reference final RpcProviderService providerRegistry,
+ @Reference final RpcService consumerRegistry) {
+ this.providerRegistry = requireNonNull(providerRegistry);
+ this.consumerRegistry = requireNonNull(consumerRegistry);
+ globalServer = new GlobalBindingRTCServer(providerRegistry);
+ reg = providerRegistry.registerRpcImplementations((TestStatus) this::testStatus, (StartTest) this::startTest);
LOG.info("RpcbenchmarkProvider initiated");
}
@Override
+ @Deactivate
+ @PreDestroy
public void close() {
+ globalServer.close();
+ reg.close();
LOG.info("RpcbenchmarkProvider closed");
}
- @Override
- public ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
+ private ListenableFuture<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
LOG.debug("startTest {}", input);
final RTCClient client;
- final List<RoutedRpcRegistration<?>> rpcRegs = new ArrayList<>();
+ RoutedBindingRTCServer routed = null;
switch (input.getOperation()) {
case ROUTEDRTC:
List<InstanceIdentifier<?>> routeIid = new ArrayList<>();
for (int i = 0; i < input.getNumServers().intValue(); i++) {
- GlobalBindingRTCServer server = new GlobalBindingRTCServer();
- RoutedRpcRegistration<RpcbenchPayloadService> routedReg =
- providerRegistry.addRoutedRpcImplementation(RpcbenchPayloadService.class, server);
-
- KeyedInstanceIdentifier<RpcRoute, RpcRouteKey> iid =
- InstanceIdentifier
- .create(RpcbenchRpcRoutes.class)
- .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i)));
- routeIid.add(iid);
- routedReg.registerPath(NodeContext.class, iid);
- rpcRegs.add(routedReg);
+ routeIid.add(InstanceIdentifier.create(RpcbenchRpcRoutes.class)
+ .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i))));
}
- client = new RoutedBindingRTClient(providerRegistry, input.getPayloadSize().intValue(), routeIid);
+ routed = new RoutedBindingRTCServer(providerRegistry, Set.copyOf(routeIid));
+ client = new RoutedBindingRTClient(consumerRegistry, input.getPayloadSize().intValue(), routeIid);
break;
case GLOBALRTC:
- client = new GlobalBindingRTCClient(providerRegistry, input.getPayloadSize().intValue());
+ client = new GlobalBindingRTCClient(consumerRegistry, input.getPayloadSize().intValue());
break;
default:
long elapsedTime = endTime - startTime;
StartTestOutput output = new StartTestOutputBuilder()
- .setRate((long)0)
- .setGlobalRtcClientError(client.getRpcError())
- .setGlobalRtcClientOk(client.getRpcOk())
- .setExecTime(TimeUnit.NANOSECONDS.toMillis(elapsedTime))
- .setRate(
- (client.getRpcOk() + client.getRpcError()) * 1000000000 / elapsedTime)
+ .setRate(Uint32.ZERO)
+ .setGlobalRtcClientError(Uint32.valueOf(client.getRpcError()))
+ .setGlobalRtcClientOk(Uint32.valueOf(client.getRpcOk()))
+ .setExecTime(Uint32.valueOf(TimeUnit.NANOSECONDS.toMillis(elapsedTime)))
+ .setRate(Uint32.valueOf(
+ (client.getRpcOk() + client.getRpcError()) * 1000000000 / elapsedTime))
.build();
return RpcResultBuilder.success(output).buildFuture();
} finally {
- for (RoutedRpcRegistration<?> routedRpcRegistration : rpcRegs) {
- routedRpcRegistration.close();
+ if (routed != null) {
+ routed.close();
}
}
}
- @Override
- public ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
+ private ListenableFuture<RpcResult<TestStatusOutput>> testStatus(final TestStatusInput input) {
LOG.info("testStatus");
TestStatusOutput output = new TestStatusOutputBuilder()
- .setGlobalServerCnt((long)globalServer.getNumRpcs())
+ .setGlobalServerCnt(Uint32.valueOf(globalServer.getNumRpcs()))
.setExecStatus(execStatus.get())
.build();
return RpcResultBuilder.success(output).buildFuture();