X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2FCarProvider.java;h=ec4c1bb1618e0f79f3a3848fa9b2850e2ee06da9;hp=1ab7f5e9016c30a29ab493c978824f6e2b85c44a;hb=f6d4d8759fcce4d35f38f5a5a056acf1d3e0b4b2;hpb=f36e0782a5bb5409d8dd95e2d08ffdbd65266663 diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java index 1ab7f5e901..ec4c1bb161 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/CarProvider.java @@ -8,34 +8,49 @@ package org.opendaylight.controller.clustering.it.provider; import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import java.util.Collection; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeCommitCohortRegistry; +import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarId; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterOwnershipInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterOwnershipInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntry; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntryBuilder; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,23 +58,44 @@ import org.slf4j.LoggerFactory; * @author Thomas Pantelis */ public class CarProvider implements CarService { - private static final Logger log = LoggerFactory.getLogger(PurchaseCarProvider.class); + private static final Logger LOG_PURCHASE_CAR = LoggerFactory.getLogger(PurchaseCarProvider.class); - private final DataBroker dataProvider; - private final EntityOwnershipService ownershipService; - private static final Logger LOG = LoggerFactory.getLogger(CarProvider.class); + private static final Logger LOG_CAR_PROVIDER = LoggerFactory.getLogger(CarProvider.class); private static final String ENTITY_TYPE = "cars"; + private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build(); + private static final DataTreeIdentifier CARS_DTID = new DataTreeIdentifier<>( + LogicalDatastoreType.CONFIGURATION, CARS_IID); + + private final DataBroker dataProvider; + private final DOMDataBroker domDataBroker; + private final EntityOwnershipService ownershipService; + private final AtomicLong succcessCounter = new AtomicLong(); + private final AtomicLong failureCounter = new AtomicLong(); private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener(); private final AtomicBoolean registeredListener = new AtomicBoolean(); + private final Collection> carsDclRegistrations = + Sets.newConcurrentHashSet(); + private final Collection> carsDtclRegistrations = + Sets.newConcurrentHashSet(); + private volatile Thread testThread; private volatile boolean stopThread; + private final AtomicReference> commitCohortReg = + new AtomicReference<>(); - public CarProvider(DataBroker dataProvider, EntityOwnershipService ownershipService) { + public CarProvider(DataBroker dataProvider, EntityOwnershipService ownershipService, + DOMDataBroker domDataBroker) { this.dataProvider = dataProvider; this.ownershipService = ownershipService; + this.domDataBroker = domDataBroker; + } + + public void close() { + stopThread(); + unregisterCommitCohort(); } private void stopThread() { @@ -79,11 +115,11 @@ public class CarProvider implements CarService { final long inputCount; // If rate is not provided, or given as zero, then just return. - if ((input.getRate() == null) || (input.getRate() == 0)) { - log.info("Exiting stress test as no rate is given."); + if (input.getRate() == null || input.getRate() == 0) { + LOG_PURCHASE_CAR.info("Exiting stress test as no rate is given."); return Futures.immediateFuture(RpcResultBuilder.failed() - .withError(ErrorType.PROTOCOL, "invalid rate") - .build()); + .withError(ErrorType.PROTOCOL, "invalid rate") + .build()); } else { inputRate = input.getRate(); } @@ -94,9 +130,12 @@ public class CarProvider implements CarService { inputCount = 0; } - log.info("Stress test starting : rate: {} count: {}", inputRate, inputCount); + LOG_PURCHASE_CAR.info("Stress test starting : rate: {} count: {}", inputRate, inputCount); stopThread(); + // clear counters + succcessCounter.set(0); + failureCounter.set(0); WriteTransaction tx = dataProvider.newWriteOnlyTransaction(); InstanceIdentifier carsId = InstanceIdentifier.builder(Cars.class).build(); @@ -104,54 +143,77 @@ public class CarProvider implements CarService { try { tx.submit().checkedGet(5, TimeUnit.SECONDS); } catch (TransactionCommitFailedException | TimeoutException e) { - log.error("Put Cars failed",e); + LOG_PURCHASE_CAR.error("Put Cars failed",e); return Futures.immediateFuture(RpcResultBuilder.success().build()); } stopThread = false; final long sleep = TimeUnit.NANOSECONDS.convert(1000,TimeUnit.MILLISECONDS) / inputRate; final Stopwatch sw = Stopwatch.createUnstarted(); - testThread = new Thread() { - @Override - public void run() { - sw.start(); - AtomicLong count = new AtomicLong(); - while(!stopThread) { - long id = count.incrementAndGet(); - WriteTransaction tx = dataProvider.newWriteOnlyTransaction(); - CarEntry car = new CarEntryBuilder().setId(new CarId("car"+id)).build(); - tx.put(LogicalDatastoreType.CONFIGURATION, - InstanceIdentifier.builder(Cars.class).child(CarEntry.class, car.getKey()).build(), - car); - tx.submit(); - try { - TimeUnit.NANOSECONDS.sleep(sleep); - } catch (InterruptedException e) { - break; - } + testThread = new Thread(() -> { + sw.start(); + AtomicLong count = new AtomicLong(); + while(!stopThread) { + long id = count.incrementAndGet(); + WriteTransaction tx1 = dataProvider.newWriteOnlyTransaction(); + CarEntry car = new CarEntryBuilder().setId(new CarId("car"+id)).build(); + tx1.put(LogicalDatastoreType.CONFIGURATION, + InstanceIdentifier.builder(Cars.class).child(CarEntry.class, car.getKey()).build(), + car); + CheckedFuture future = tx1.submit(); + Futures.addCallback(future, new FutureCallback() { - if((count.get() % 1000) == 0) { - log.info("Cars created {}, time: {}",count.get(),sw.elapsed(TimeUnit.SECONDS)); + @Override + public void onSuccess(final Void result) { + // Transaction succeeded + succcessCounter.getAndIncrement(); } - // Check if a count is specified in input and we have created that many cars. - if ((inputCount != 0) && (count.get() >= inputCount)) { - stopThread = true; + @Override + public void onFailure(final Throwable t) { + // Transaction failed + failureCounter.getAndIncrement(); + LOG_CAR_PROVIDER.error("Put Cars failed", t); } + }); + try { + TimeUnit.NANOSECONDS.sleep(sleep); + } catch (InterruptedException e) { + break; + } + + if(count.get() % 1000 == 0) { + LOG_PURCHASE_CAR.info("Cars created {}, time: {}",count.get(),sw.elapsed(TimeUnit.SECONDS)); } - log.info("Stress test thread stopping after creating {} cars.", count.get()); + // Check if a count is specified in input and we have created that many cars. + if (inputCount != 0 && count.get() >= inputCount) { + stopThread = true; + } } - }; + + LOG_PURCHASE_CAR.info("Stress test thread stopping after creating {} cars.", count.get()); + }); testThread.start(); return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override - public Future> stopStressTest() { + public Future> stopStressTest() { stopThread(); - return Futures.immediateFuture(RpcResultBuilder.success().build()); + StopStressTestOutputBuilder stopStressTestOutput; + stopStressTestOutput = new StopStressTestOutputBuilder() + .setSuccessCount(succcessCounter.longValue()) + .setFailureCount(failureCounter.longValue()); + + StopStressTestOutput result = stopStressTestOutput.build(); + LOG_PURCHASE_CAR.info("Executed Stop Stress test; No. of cars created {}; " + + "No. of cars failed {}; ", succcessCounter, failureCounter); + // clear counters + succcessCounter.set(0); + failureCounter.set(0); + return Futures.immediateFuture(RpcResultBuilder.success(result).build()); } @@ -180,7 +242,114 @@ public class CarProvider implements CarService { private static class CarEntityOwnershipListener implements EntityOwnershipListener { @Override public void ownershipChanged(EntityOwnershipChange ownershipChange) { - LOG.info("ownershipChanged: {}", ownershipChange); + LOG_CAR_PROVIDER.info("ownershipChanged: {}", ownershipChange); + } + } + + @Override + public Future> registerLoggingDcl() { + LOG_CAR_PROVIDER.info("Registering a new CarDataChangeListener"); + final ListenerRegistration carsDclRegistration = dataProvider.registerDataChangeListener( + LogicalDatastoreType.CONFIGURATION, CARS_IID, new CarDataChangeListener(), + AsyncDataBroker.DataChangeScope.SUBTREE); + + if (carsDclRegistration != null) { + carsDclRegistrations.add(carsDclRegistration); + return RpcResultBuilder.success().buildFuture(); + } + return RpcResultBuilder.failed().buildFuture(); + } + + @Override + public Future> registerLoggingDtcl() { + LOG_CAR_PROVIDER.info("Registering a new CarDataTreeChangeListener"); + final ListenerRegistration carsDtclRegistration = + dataProvider.registerDataTreeChangeListener(CARS_DTID, new CarDataTreeChangeListener()); + + if (carsDtclRegistration != null) { + carsDtclRegistrations.add(carsDtclRegistration); + return RpcResultBuilder.success().buildFuture(); + } + return RpcResultBuilder.failed().buildFuture(); + } + + @Override + public Future> unregisterLoggingDcls() { + LOG_CAR_PROVIDER.info("Unregistering the CarDataChangeListener(s)"); + synchronized (carsDclRegistrations) { + int numListeners = 0; + for (ListenerRegistration carsDclRegistration : carsDclRegistrations) { + carsDclRegistration.close(); + numListeners++; + } + carsDclRegistrations.clear(); + LOG_CAR_PROVIDER.info("Unregistered {} CarDataChangeListener(s)", numListeners); + } + return RpcResultBuilder.success().buildFuture(); + } + + @Override + public Future> unregisterLoggingDtcls() { + LOG_CAR_PROVIDER.info("Unregistering the CarDataTreeChangeListener(s)"); + synchronized (carsDtclRegistrations) { + int numListeners = 0; + for (ListenerRegistration carsDtclRegistration : carsDtclRegistrations) { + carsDtclRegistration.close(); + numListeners++; + } + carsDtclRegistrations.clear(); + LOG_CAR_PROVIDER.info("Unregistered {} CaraDataTreeChangeListener(s)", numListeners); + } + return RpcResultBuilder.success().buildFuture(); + } + + @Override + @SuppressWarnings("checkstyle:IllegalCatch") + public Future> unregisterCommitCohort() { + final DOMDataTreeCommitCohortRegistration reg = commitCohortReg.getAndSet(null); + if (reg != null) { + try { + reg.close(); + LOG_CAR_PROVIDER.info("Unregistered commit cohort"); + } catch (Exception e) { + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, + "Error closing commit cohort registration", e).buildFuture(); + } } + + return RpcResultBuilder.success().buildFuture(); + } + + @Override + public synchronized Future> registerCommitCohort() { + if (commitCohortReg.get() != null) { + return RpcResultBuilder.success().buildFuture(); + } + + final DOMDataTreeCommitCohortRegistry commitCohortRegistry = (DOMDataTreeCommitCohortRegistry) + domDataBroker.getSupportedExtensions().get(DOMDataTreeCommitCohortRegistry.class); + + if (commitCohortRegistry == null) { + // Shouldn't happen + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, + "DOMDataTreeCommitCohortRegistry not found").buildFuture(); + } + + // Note: it may look strange that we specify the CarEntry.QNAME twice in the path below. This must be done in + // order to register the commit cohort for CarEntry instances. In the underlying data tree, a yang list is + // represented as a MapNode with MapEntryNodes representing the child list entries. Therefore, in order to + // address a list entry, you must specify the path argument for the MapNode and the path argument for the + // MapEntryNode. In the path below, the first CarEntry.QNAME argument addresses the MapNode and, since we want + // to address all list entries, the second path argument is wild-carded by specifying just the CarEntry.QNAME. + final YangInstanceIdentifier carEntryPath = YangInstanceIdentifier.builder( + YangInstanceIdentifier.of(Cars.QNAME)).node(CarEntry.QNAME).node(CarEntry.QNAME).build(); + commitCohortReg.set(commitCohortRegistry.registerCommitCohort( + new org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier( + org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION, + carEntryPath), new CarEntryDataTreeCommitCohort())); + + LOG_CAR_PROVIDER.info("Registered commit cohort"); + + return RpcResultBuilder.success().buildFuture(); } }