rpcbenchmark: use lambdas
[controller.git] / benchmark / rpcbenchmark / src / main / java / rpcbenchmark / impl / RpcbenchmarkProvider.java
1 /*
2  * Copyright (c) 2015 Cisco Systems Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package rpcbenchmark.impl;
10
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.Future;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.atomic.AtomicReference;
18
19 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
20 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
21 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
22 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
23 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
24 import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.NodeContext;
25 import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchPayloadService;
26 import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.RpcbenchRpcRoutes;
27 import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRoute;
28 import org.opendaylight.yang.gen.v1.rpcbench.payload.rev150702.rpcbench.rpc.routes.RpcRouteKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.RpcbenchmarkService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.StartTestOutputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutput.ExecStatus;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.rpcbenchmark.rev150702.TestStatusOutputBuilder;
36 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
37 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.common.RpcResult;
39 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 public class RpcbenchmarkProvider implements BindingAwareProvider, AutoCloseable, RpcbenchmarkService {
44
45     private static final Logger LOG = LoggerFactory.getLogger(RpcbenchmarkProvider.class);
46     private static final GlobalBindingRTCServer gServer = new GlobalBindingRTCServer();
47     private static final int testTimeout = 5;
48     private final AtomicReference<ExecStatus> execStatus = new AtomicReference<>(ExecStatus.Idle);
49     private RpcConsumerRegistry consumerRegistry;
50     private RpcProviderRegistry providerRegistry;
51
52     @Override
53     public void onSessionInitiated(final ProviderContext session) {
54         LOG.debug("RpcbenchmarkProvider Session Initiated");
55         consumerRegistry = session.getSALService(RpcConsumerRegistry.class);
56         providerRegistry = session.getSALService(RpcProviderRegistry.class);
57
58         // Register the benchmark Global RPC
59         session.addRpcImplementation(RpcbenchPayloadService.class, gServer);
60         // Register RPC Benchmark's control REST API
61         session.addRpcImplementation(RpcbenchmarkService.class, this);
62     }
63
64     @Override
65     public void close() throws Exception {
66         LOG.debug("RpcbenchmarkProvider Closed");
67     }
68
69     @Override
70     public Future<RpcResult<StartTestOutput>> startTest(final StartTestInput input) {
71         LOG.debug("startTest {}", input);
72
73         final RTCClient client;
74         final List<RoutedRpcRegistration<?>> rpcRegs = new ArrayList<>();
75
76         switch (input.getOperation()) {
77         case ROUTEDRTC:
78             List<InstanceIdentifier<?>> routeIid = new ArrayList<>();
79             for (int i = 0; i < input.getNumServers().intValue(); i++) {
80                 GlobalBindingRTCServer server = new GlobalBindingRTCServer();
81                 RoutedRpcRegistration<RpcbenchPayloadService> routedReg =
82                         providerRegistry.addRoutedRpcImplementation(RpcbenchPayloadService.class, server);
83
84                 KeyedInstanceIdentifier<RpcRoute, RpcRouteKey> iid =
85                         InstanceIdentifier
86                             .create(RpcbenchRpcRoutes.class)
87                             .child(RpcRoute.class, new RpcRouteKey(Integer.toString(i)));
88                 routeIid.add(iid);
89                 routedReg.registerPath(NodeContext.class, iid);
90                 rpcRegs.add(routedReg);
91             }
92
93             client = new RoutedBindingRTClient(consumerRegistry, input.getPayloadSize().intValue(), routeIid);
94             break;
95
96         case GLOBALRTC:
97             client = new GlobalBindingRTCClient(consumerRegistry, input.getPayloadSize().intValue());
98             break;
99
100         default:
101             LOG.error("Unsupported server/client type {}", input.getOperation());
102             throw new IllegalArgumentException("Unsupported server/client type" + input.getOperation());
103         }
104
105         try {
106             ExecutorService executor = Executors.newFixedThreadPool(input.getNumClients().intValue());
107
108             final Runnable testRun = () -> client.runTest(input.getIterations().intValue());
109
110             LOG.info("Test Started");
111             long startTime = System.nanoTime();
112
113             for (int i = 0; i < input.getNumClients().intValue(); i++ ) {
114                 executor.submit(testRun);
115             }
116
117             executor.shutdown();
118             try {
119                 executor.awaitTermination(testTimeout, TimeUnit.MINUTES);
120             } catch (final InterruptedException e) {
121                 LOG.error("Out of time: test did not finish within the {} min deadline ", testTimeout); 
122             }
123
124             long endTime = System.nanoTime();
125             LOG.info("Test Done");
126
127             long elapsedTime = endTime - startTime;
128
129             StartTestOutput output = new StartTestOutputBuilder()
130                                             .setRate((long)0)
131                                             .setGlobalRtcClientError(client.getRpcError())
132                                             .setGlobalRtcClientOk(client.getRpcOk())
133                                             .setExecTime(TimeUnit.NANOSECONDS.toMillis(elapsedTime))
134                                             .setRate(((client.getRpcOk() + client.getRpcError()) * 1000000000) / elapsedTime)
135                                             .build();
136             return RpcResultBuilder.success(output).buildFuture();
137         } finally {
138             for (RoutedRpcRegistration<?> routedRpcRegistration : rpcRegs) {
139                 routedRpcRegistration.close();
140             }
141         }
142     }
143
144     @Override
145     public Future<RpcResult<TestStatusOutput>> testStatus() {
146         LOG.info("testStatus");
147         TestStatusOutput output = new TestStatusOutputBuilder()
148                                         .setGlobalServerCnt((long)gServer.getNumRpcs())
149                                         .setExecStatus(execStatus.get())
150                                         .build();
151         return RpcResultBuilder.success(output).buildFuture();
152     }
153
154 }