X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2FCarProvider.java;h=1daef6eeb1c5ed386f45af91c584830aff31c83f;hb=refs%2Fchanges%2F03%2F83803%2F53;hp=d70ac410d1535cfaba967e7b063dc8a7f6962df9;hpb=e0d230dfee7bcc7e384e2d8f933854ad449d441e;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java index d70ac410d1..1daef6eeb1 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java @@ -9,40 +9,58 @@ package org.opendaylight.controller.clustering.it.provider; import com.google.common.base.Stopwatch; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Collection; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; -import org.opendaylight.controller.md.sal.common.api.clustering.Entity; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeCommitCohortRegistry; import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration; +import org.opendaylight.mdsal.eos.binding.api.Entity; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService; +import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarId; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarsBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterCommitCohortInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterCommitCohortOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterCommitCohortOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterLoggingDtclInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterLoggingDtclOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterLoggingDtclOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterOwnershipInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterOwnershipOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterOwnershipOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterCommitCohortInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterCommitCohortOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterCommitCohortOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterLoggingDtclsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterLoggingDtclsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterLoggingDtclsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterOwnershipInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterOwnershipOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterOwnershipOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntry; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntryBuilder; import org.opendaylight.yangtools.concepts.ListenerRegistration; @@ -55,15 +73,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** + * Implementation of CarService. + * * @author Thomas Pantelis */ +@SuppressFBWarnings("SLF4J_ILLEGAL_PASSED_CLASS") public class CarProvider implements CarService { - private static final Logger log = LoggerFactory.getLogger(PurchaseCarProvider.class); + private static final Logger LOG_PURCHASE_CAR = LoggerFactory.getLogger(PurchaseCarProvider.class); - private static final Logger LOG = LoggerFactory.getLogger(CarProvider.class); + private static final Logger LOG_CAR_PROVIDER = LoggerFactory.getLogger(CarProvider.class); private static final String ENTITY_TYPE = "cars"; - private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build(); + private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build(); private static final DataTreeIdentifier CARS_DTID = new DataTreeIdentifier<>( LogicalDatastoreType.CONFIGURATION, CARS_IID); @@ -76,7 +97,7 @@ public class CarProvider implements CarService { private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener(); private final AtomicBoolean registeredListener = new AtomicBoolean(); - private final Collection> carsDclRegistrations = + private final Collection> carsDclRegistrations = Sets.newConcurrentHashSet(); private final Collection> carsDtclRegistrations = Sets.newConcurrentHashSet(); @@ -86,8 +107,8 @@ public class CarProvider implements CarService { private final AtomicReference> commitCohortReg = new AtomicReference<>(); - public CarProvider(DataBroker dataProvider, EntityOwnershipService ownershipService, - DOMDataBroker domDataBroker) { + public CarProvider(final DataBroker dataProvider, final EntityOwnershipService ownershipService, + final DOMDataBroker domDataBroker) { this.dataProvider = dataProvider; this.ownershipService = ownershipService; this.domDataBroker = domDataBroker; @@ -95,42 +116,43 @@ public class CarProvider implements CarService { public void close() { stopThread(); - unregisterCommitCohort(); + closeCommitCohortRegistration(); } private void stopThread() { - if(testThread != null) { + if (testThread != null) { stopThread = true; testThread.interrupt(); try { testThread.join(); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + // don't care + } testThread = null; } } @Override - public Future> stressTest(StressTestInput input) { + public ListenableFuture> stressTest(final StressTestInput input) { final int inputRate; final long inputCount; // If rate is not provided, or given as zero, then just return. - if (input.getRate() == null || input.getRate() == 0) { - log.info("Exiting stress test as no rate is given."); - return Futures.immediateFuture(RpcResultBuilder.failed() + if (input.getRate() == null || input.getRate().toJava() == 0) { + LOG_PURCHASE_CAR.info("Exiting stress test as no rate is given."); + return Futures.immediateFuture(RpcResultBuilder.failed() .withError(ErrorType.PROTOCOL, "invalid rate") .build()); - } else { - inputRate = input.getRate(); } + inputRate = input.getRate().toJava(); if (input.getCount() != null) { - inputCount = input.getCount(); + inputCount = input.getCount().toJava(); } else { inputCount = 0; } - log.info("Stress test starting : rate: {} count: {}", inputRate, inputCount); + LOG_PURCHASE_CAR.info("Stress test starting : rate: {} count: {}", inputRate, inputCount); stopThread(); // clear counters @@ -138,81 +160,76 @@ public class CarProvider implements CarService { failureCounter.set(0); WriteTransaction tx = dataProvider.newWriteOnlyTransaction(); - InstanceIdentifier carsId = InstanceIdentifier.builder(Cars.class).build(); + InstanceIdentifier carsId = InstanceIdentifier.create(Cars.class); tx.merge(LogicalDatastoreType.CONFIGURATION, carsId, new CarsBuilder().build()); try { tx.submit().checkedGet(5, TimeUnit.SECONDS); } catch (TransactionCommitFailedException | TimeoutException e) { - log.error("Put Cars failed",e); - return Futures.immediateFuture(RpcResultBuilder.success().build()); + LOG_PURCHASE_CAR.error("Put Cars failed",e); + return Futures.immediateFuture(RpcResultBuilder.success(new StressTestOutputBuilder().build()).build()); } stopThread = false; final long sleep = TimeUnit.NANOSECONDS.convert(1000,TimeUnit.MILLISECONDS) / inputRate; final Stopwatch sw = Stopwatch.createUnstarted(); - testThread = new Thread() { - @Override - public void run() { - sw.start(); - AtomicLong count = new AtomicLong(); - while(!stopThread) { - long id = count.incrementAndGet(); - WriteTransaction tx = dataProvider.newWriteOnlyTransaction(); - CarEntry car = new CarEntryBuilder().setId(new CarId("car"+id)).build(); - tx.put(LogicalDatastoreType.CONFIGURATION, - InstanceIdentifier.builder(Cars.class).child(CarEntry.class, car.getKey()).build(), - car); - CheckedFuture future = tx.submit(); - Futures.addCallback(future, new FutureCallback() { - - @Override - public void onSuccess(final Void result) { - // Transaction succeeded - succcessCounter.getAndIncrement(); - } - - @Override - public void onFailure(final Throwable t) { - // Transaction failed - failureCounter.getAndIncrement(); - LOG.error("Put Cars failed", t); - } - }); - try { - TimeUnit.NANOSECONDS.sleep(sleep); - } catch (InterruptedException e) { - break; + testThread = new Thread(() -> { + sw.start(); + AtomicLong count = new AtomicLong(); + while (!stopThread) { + long id = count.incrementAndGet(); + WriteTransaction tx1 = dataProvider.newWriteOnlyTransaction(); + CarEntry car = new CarEntryBuilder().setId(new CarId("car" + id)).build(); + tx1.put(LogicalDatastoreType.CONFIGURATION, + InstanceIdentifier.builder(Cars.class).child(CarEntry.class, car.key()).build(), car); + Futures.addCallback(tx1.submit(), new FutureCallback() { + + @Override + public void onSuccess(final Void result) { + // Transaction succeeded + succcessCounter.getAndIncrement(); } - if(count.get() % 1000 == 0) { - log.info("Cars created {}, time: {}",count.get(),sw.elapsed(TimeUnit.SECONDS)); + @Override + public void onFailure(final Throwable ex) { + // Transaction failed + failureCounter.getAndIncrement(); + LOG_CAR_PROVIDER.error("Put Cars failed", ex); } + }, MoreExecutors.directExecutor()); + try { + TimeUnit.NANOSECONDS.sleep(sleep); + } catch (InterruptedException e) { + break; + } - // Check if a count is specified in input and we have created that many cars. - if (inputCount != 0 && count.get() >= inputCount) { - stopThread = true; - } + if (count.get() % 1000 == 0) { + LOG_PURCHASE_CAR.info("Cars created {}, time: {}", count.get(), sw.elapsed(TimeUnit.SECONDS)); } - log.info("Stress test thread stopping after creating {} cars.", count.get()); + // Check if a count is specified in input and we have created that many cars. + if (inputCount != 0 && count.get() >= inputCount) { + stopThread = true; + } } - }; + + LOG_PURCHASE_CAR.info("Stress test thread stopping after creating {} cars.", count.get()); + }); testThread.start(); - return Futures.immediateFuture(RpcResultBuilder.success().build()); + return Futures.immediateFuture(RpcResultBuilder.success(new StressTestOutputBuilder().build()).build()); } @Override - public Future> stopStressTest() { + public ListenableFuture> stopStressTest(final StopStressTestInput input) { stopThread(); StopStressTestOutputBuilder stopStressTestOutput; stopStressTestOutput = new StopStressTestOutputBuilder() .setSuccessCount(succcessCounter.longValue()) .setFailureCount(failureCounter.longValue()); - StopStressTestOutput result = stopStressTestOutput.build(); - log.info("Executed Stop Stress test; No. of cars created {}; " + - "No. of cars failed {}; ", succcessCounter, failureCounter); + final StopStressTestOutput result = stopStressTestOutput.build(); + LOG_PURCHASE_CAR.info("Executed Stop Stress test; No. of cars created {}; " + + "No. of cars failed {}; ", succcessCounter, failureCounter); // clear counters succcessCounter.set(0); failureCounter.set(0); @@ -221,8 +238,8 @@ public class CarProvider implements CarService { @Override - public Future> registerOwnership(RegisterOwnershipInput input) { - if(registeredListener.compareAndSet(false, true)) { + public ListenableFuture> registerOwnership(final RegisterOwnershipInput input) { + if (registeredListener.compareAndSet(false, true)) { ownershipService.registerListener(ENTITY_TYPE, ownershipListener); } @@ -230,70 +247,41 @@ public class CarProvider implements CarService { try { ownershipService.registerCandidate(entity); } catch (CandidateAlreadyRegisteredException e) { - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "Could not register for car " + input.getCarId(), e).buildFuture(); } - return RpcResultBuilder.success().buildFuture(); + return RpcResultBuilder.success(new RegisterOwnershipOutputBuilder().build()).buildFuture(); } @Override - public Future> unregisterOwnership(UnregisterOwnershipInput input) { - return RpcResultBuilder.success().buildFuture(); + public ListenableFuture> unregisterOwnership( + final UnregisterOwnershipInput input) { + return RpcResultBuilder.success(new UnregisterOwnershipOutputBuilder().build()).buildFuture(); } private static class CarEntityOwnershipListener implements EntityOwnershipListener { @Override - public void ownershipChanged(EntityOwnershipChange ownershipChange) { - LOG.info("ownershipChanged: {}", ownershipChange); + public void ownershipChanged(final EntityOwnershipChange ownershipChange) { + LOG_CAR_PROVIDER.info("ownershipChanged: {}", ownershipChange); } } @Override - public Future> registerLoggingDcl() { - LOG.info("Registering a new CarDataChangeListener"); - final ListenerRegistration carsDclRegistration = dataProvider.registerDataChangeListener( - LogicalDatastoreType.CONFIGURATION, CARS_IID, new CarDataChangeListener(), - AsyncDataBroker.DataChangeScope.SUBTREE); - - if (carsDclRegistration != null) { - carsDclRegistrations.add(carsDclRegistration); - return RpcResultBuilder.success().buildFuture(); - } - return RpcResultBuilder.failed().buildFuture(); - } - - @Override - public Future> registerLoggingDtcl() { - LOG.info("Registering a new CarDataTreeChangeListener"); + public ListenableFuture> registerLoggingDtcl( + final RegisterLoggingDtclInput input) { + LOG_CAR_PROVIDER.info("Registering a new CarDataTreeChangeListener"); final ListenerRegistration carsDtclRegistration = dataProvider.registerDataTreeChangeListener(CARS_DTID, new CarDataTreeChangeListener()); - if (carsDtclRegistration != null) { - carsDtclRegistrations.add(carsDtclRegistration); - return RpcResultBuilder.success().buildFuture(); - } - return RpcResultBuilder.failed().buildFuture(); + carsDtclRegistrations.add(carsDtclRegistration); + return RpcResultBuilder.success(new RegisterLoggingDtclOutputBuilder().build()).buildFuture(); } @Override - public Future> unregisterLoggingDcls() { - LOG.info("Unregistering the CarDataChangeListener(s)"); - synchronized (carsDclRegistrations) { - int numListeners = 0; - for (ListenerRegistration carsDclRegistration : carsDclRegistrations) { - carsDclRegistration.close(); - numListeners++; - } - carsDclRegistrations.clear(); - LOG.info("Unregistered {} CarDataChangeListener(s)", numListeners); - } - return RpcResultBuilder.success().buildFuture(); - } - - @Override - public Future> unregisterLoggingDtcls() { - LOG.info("Unregistering the CarDataTreeChangeListener(s)"); + public ListenableFuture> unregisterLoggingDtcls( + final UnregisterLoggingDtclsInput input) { + LOG_CAR_PROVIDER.info("Unregistering the CarDataTreeChangeListener(s)"); synchronized (carsDtclRegistrations) { int numListeners = 0; for (ListenerRegistration carsDtclRegistration : carsDtclRegistrations) { @@ -301,32 +289,33 @@ public class CarProvider implements CarService { numListeners++; } carsDtclRegistrations.clear(); - LOG.info("Unregistered {} CaraDataTreeChangeListener(s)", numListeners); + LOG_CAR_PROVIDER.info("Unregistered {} CaraDataTreeChangeListener(s)", numListeners); } - return RpcResultBuilder.success().buildFuture(); + return RpcResultBuilder.success(new UnregisterLoggingDtclsOutputBuilder().build()).buildFuture(); } @Override @SuppressWarnings("checkstyle:IllegalCatch") - public Future> unregisterCommitCohort() { + public ListenableFuture> unregisterCommitCohort( + final UnregisterCommitCohortInput input) { + closeCommitCohortRegistration(); + + return RpcResultBuilder.success(new UnregisterCommitCohortOutputBuilder().build()).buildFuture(); + } + + private void closeCommitCohortRegistration() { final DOMDataTreeCommitCohortRegistration reg = commitCohortReg.getAndSet(null); if (reg != null) { - try { - reg.close(); - LOG.info("Unregistered commit cohort"); - } catch (Exception e) { - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, - "Error closing commit cohort registration", e).buildFuture(); - } + reg.close(); + LOG_CAR_PROVIDER.info("Unregistered commit cohort"); } - - return RpcResultBuilder.success().buildFuture(); } @Override - public synchronized Future> registerCommitCohort() { + public synchronized ListenableFuture> registerCommitCohort( + final RegisterCommitCohortInput input) { if (commitCohortReg.get() != null) { - return RpcResultBuilder.success().buildFuture(); + return RpcResultBuilder.success(new RegisterCommitCohortOutputBuilder().build()).buildFuture(); } final DOMDataTreeCommitCohortRegistry commitCohortRegistry = (DOMDataTreeCommitCohortRegistry) @@ -334,7 +323,7 @@ public class CarProvider implements CarService { if (commitCohortRegistry == null) { // Shouldn't happen - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "DOMDataTreeCommitCohortRegistry not found").buildFuture(); } @@ -351,8 +340,8 @@ public class CarProvider implements CarService { org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION, carEntryPath), new CarEntryDataTreeCommitCohort())); - LOG.info("Registered commit cohort"); + LOG_CAR_PROVIDER.info("Registered commit cohort"); - return RpcResultBuilder.success().buildFuture(); + return RpcResultBuilder.success(new RegisterCommitCohortOutputBuilder().build()).buildFuture(); } }