--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.clustering.it.provider;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntryBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Thomas Pantelis
+ */
+public class CarProvider implements CarService {
+ private static final Logger log = LoggerFactory.getLogger(PurchaseCarProvider.class);
+
+ private final DataBroker dataProvider;
+
+ private volatile Thread testThread;
+ private volatile boolean stopThread;
+
+ public CarProvider(DataBroker dataProvider) {
+ this.dataProvider = dataProvider;
+ }
+
+ private void stopThread() {
+ if(testThread != null) {
+ stopThread = true;
+ testThread.interrupt();
+ try {
+ testThread.join();
+ } catch (InterruptedException e) {}
+ testThread = null;
+ }
+ }
+
+ @Override
+ public Future<RpcResult<Void>> stressTest(StressTestInput input) {
+ log.info("stressTest starting : rate: {}", input.getRate());
+
+ stopThread();
+
+ WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
+ InstanceIdentifier<Cars> carsId = InstanceIdentifier.<Cars>builder(Cars.class).build();
+ tx.merge(LogicalDatastoreType.CONFIGURATION, carsId, new CarsBuilder().build());
+ try {
+ tx.submit().checkedGet(5, TimeUnit.SECONDS);
+ } catch (TransactionCommitFailedException | TimeoutException e) {
+ log.error("Put Cars failed",e);
+ return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ }
+
+ stopThread = false;
+ final long sleep = TimeUnit.NANOSECONDS.convert(1000,TimeUnit.MILLISECONDS) / input.getRate();
+ final Stopwatch sw = Stopwatch.createUnstarted();
+ testThread = new Thread() {
+ @Override
+ public void run() {
+ sw.start();
+ AtomicLong count = new AtomicLong();
+ while(!stopThread) {
+ long id = count.incrementAndGet();
+ WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
+ CarEntry car = new CarEntryBuilder().setId(new CarId("car"+id)).build();
+ tx.put(LogicalDatastoreType.CONFIGURATION,
+ InstanceIdentifier.<Cars>builder(Cars.class).child(CarEntry.class, car.getKey()).build(),
+ car);
+ tx.submit();
+ try {
+ TimeUnit.NANOSECONDS.sleep(sleep);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ if((count.get() % 1000) == 0) {
+ log.info("Cars created {}, time: {}",count.get(),sw.elapsed(TimeUnit.SECONDS));
+ }
+ }
+
+ log.info("Stress test thread stopping");
+ }
+ };
+ testThread.start();
+
+ return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ }
+
+ @Override
+ public Future<RpcResult<Void>> stopStressTest() {
+ stopThread();
+ return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+ }
+
+}
import org.opendaylight.controller.clustering.it.listener.PeopleCarListener;
+import org.opendaylight.controller.clustering.it.provider.CarProvider;
import org.opendaylight.controller.clustering.it.provider.PeopleProvider;
import org.opendaylight.controller.clustering.it.provider.PurchaseCarProvider;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.purchase.rev140818.CarPurchaseService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.PeopleService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.NotificationListener;
people.setRpcRegistration(purchaseCarRpc);
+ CarProvider carProvider = new CarProvider(dataBrokerService);
+ getRpcRegistryDependency().addRpcImplementation(CarService.class, carProvider);
+
final BindingAwareBroker.RpcRegistration<PeopleService> peopleRpcReg = getRpcRegistryDependency()
.addRpcImplementation(PeopleService.class, people);