/* * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.clustering.it.provider; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarId; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntry; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntryBuilder; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Thomas Pantelis */ public class CarProvider implements CarService { private static final Logger log = LoggerFactory.getLogger(PurchaseCarProvider.class); private final DataBroker dataProvider; private volatile Thread testThread; private volatile boolean stopThread; public CarProvider(DataBroker dataProvider) { this.dataProvider = dataProvider; } private void stopThread() { if(testThread != null) { stopThread = true; testThread.interrupt(); try { testThread.join(); } catch (InterruptedException e) {} testThread = null; } } @Override public Future> stressTest(StressTestInput input) { log.info("stressTest starting : rate: {}", input.getRate()); stopThread(); WriteTransaction tx = dataProvider.newWriteOnlyTransaction(); InstanceIdentifier carsId = InstanceIdentifier.builder(Cars.class).build(); tx.merge(LogicalDatastoreType.CONFIGURATION, carsId, new CarsBuilder().build()); try { tx.submit().checkedGet(5, TimeUnit.SECONDS); } catch (TransactionCommitFailedException | TimeoutException e) { log.error("Put Cars failed",e); return Futures.immediateFuture(RpcResultBuilder.success().build()); } stopThread = false; final long sleep = TimeUnit.NANOSECONDS.convert(1000,TimeUnit.MILLISECONDS) / input.getRate(); final Stopwatch sw = Stopwatch.createUnstarted(); testThread = new Thread() { @Override public void run() { sw.start(); AtomicLong count = new AtomicLong(); while(!stopThread) { long id = count.incrementAndGet(); WriteTransaction tx = dataProvider.newWriteOnlyTransaction(); CarEntry car = new CarEntryBuilder().setId(new CarId("car"+id)).build(); tx.put(LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.builder(Cars.class).child(CarEntry.class, car.getKey()).build(), car); tx.submit(); try { TimeUnit.NANOSECONDS.sleep(sleep); } catch (InterruptedException e) { break; } if((count.get() % 1000) == 0) { log.info("Cars created {}, time: {}",count.get(),sw.elapsed(TimeUnit.SECONDS)); } } log.info("Stress test thread stopping"); } }; testThread.start(); return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> stopStressTest() { stopThread(); return Futures.immediateFuture(RpcResultBuilder.success().build()); } }