2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider;
10 import com.google.common.base.Stopwatch;
11 import com.google.common.collect.Sets;
12 import com.google.common.util.concurrent.Futures;
13 import java.util.Collection;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException;
17 import java.util.concurrent.atomic.AtomicBoolean;
18 import java.util.concurrent.atomic.AtomicLong;
19 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
20 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
21 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
24 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
25 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
26 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
27 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarId;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterOwnershipInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterOwnershipInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntry;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntryBuilder;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
42 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
43 import org.opendaylight.yangtools.yang.common.RpcResult;
44 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * @author Thomas Pantelis
51 public class CarProvider implements CarService {
52 private static final Logger log = LoggerFactory.getLogger(PurchaseCarProvider.class);
54 private final DataBroker dataProvider;
55 private final EntityOwnershipService ownershipService;
56 private static final Logger LOG = LoggerFactory.getLogger(CarProvider.class);
58 private static final String ENTITY_TYPE = "cars";
60 private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener();
61 private final AtomicBoolean registeredListener = new AtomicBoolean();
63 private volatile Thread testThread;
64 private volatile boolean stopThread;
66 private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build();
67 private static final DataTreeIdentifier<Cars> CARS_DTID = new DataTreeIdentifier<>(
68 LogicalDatastoreType.CONFIGURATION, CARS_IID);
70 private Collection<ListenerRegistration<DataChangeListener>> carsDclRegistrations =
71 Sets.newConcurrentHashSet();
72 private Collection<ListenerRegistration<CarDataTreeChangeListener>> carsDtclRegistrations =
73 Sets.newConcurrentHashSet();
75 public CarProvider(DataBroker dataProvider, EntityOwnershipService ownershipService) {
76 this.dataProvider = dataProvider;
77 this.ownershipService = ownershipService;
80 private void stopThread() {
81 if(testThread != null) {
83 testThread.interrupt();
86 } catch (InterruptedException e) {}
92 public Future<RpcResult<Void>> stressTest(StressTestInput input) {
94 final long inputCount;
96 // If rate is not provided, or given as zero, then just return.
97 if ((input.getRate() == null) || (input.getRate() == 0)) {
98 log.info("Exiting stress test as no rate is given.");
99 return Futures.immediateFuture(RpcResultBuilder.<Void>failed()
100 .withError(ErrorType.PROTOCOL, "invalid rate")
103 inputRate = input.getRate();
106 if (input.getCount() != null) {
107 inputCount = input.getCount();
112 log.info("Stress test starting : rate: {} count: {}", inputRate, inputCount);
116 WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
117 InstanceIdentifier<Cars> carsId = InstanceIdentifier.<Cars>builder(Cars.class).build();
118 tx.merge(LogicalDatastoreType.CONFIGURATION, carsId, new CarsBuilder().build());
120 tx.submit().checkedGet(5, TimeUnit.SECONDS);
121 } catch (TransactionCommitFailedException | TimeoutException e) {
122 log.error("Put Cars failed",e);
123 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
127 final long sleep = TimeUnit.NANOSECONDS.convert(1000,TimeUnit.MILLISECONDS) / inputRate;
128 final Stopwatch sw = Stopwatch.createUnstarted();
129 testThread = new Thread() {
133 AtomicLong count = new AtomicLong();
135 long id = count.incrementAndGet();
136 WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
137 CarEntry car = new CarEntryBuilder().setId(new CarId("car"+id)).build();
138 tx.put(LogicalDatastoreType.CONFIGURATION,
139 InstanceIdentifier.<Cars>builder(Cars.class).child(CarEntry.class, car.getKey()).build(),
143 TimeUnit.NANOSECONDS.sleep(sleep);
144 } catch (InterruptedException e) {
148 if((count.get() % 1000) == 0) {
149 log.info("Cars created {}, time: {}",count.get(),sw.elapsed(TimeUnit.SECONDS));
152 // Check if a count is specified in input and we have created that many cars.
153 if ((inputCount != 0) && (count.get() >= inputCount)) {
158 log.info("Stress test thread stopping after creating {} cars.", count.get());
163 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
167 public Future<RpcResult<Void>> stopStressTest() {
169 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
174 public Future<RpcResult<Void>> registerOwnership(RegisterOwnershipInput input) {
175 if(registeredListener.compareAndSet(false, true)) {
176 ownershipService.registerListener(ENTITY_TYPE, ownershipListener);
179 Entity entity = new Entity(ENTITY_TYPE, input.getCarId());
181 ownershipService.registerCandidate(entity);
182 } catch (CandidateAlreadyRegisteredException e) {
183 return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
184 "Could not register for car " + input.getCarId(), e).buildFuture();
187 return RpcResultBuilder.<Void>success().buildFuture();
191 public Future<RpcResult<Void>> unregisterOwnership(UnregisterOwnershipInput input) {
192 return RpcResultBuilder.<Void>success().buildFuture();
195 private static class CarEntityOwnershipListener implements EntityOwnershipListener {
197 public void ownershipChanged(EntityOwnershipChange ownershipChange) {
198 LOG.info("ownershipChanged: {}", ownershipChange);
203 public Future<RpcResult<java.lang.Void>> registerLoggingDcl() {
204 LOG.info("Registering a new CarDataChangeListener");
205 final ListenerRegistration carsDclRegistration = dataProvider.registerDataChangeListener(
206 LogicalDatastoreType.CONFIGURATION, CARS_IID, new CarDataChangeListener(),
207 AsyncDataBroker.DataChangeScope.SUBTREE);
209 if (carsDclRegistration != null) {
210 carsDclRegistrations.add(carsDclRegistration);
211 return RpcResultBuilder.<Void>success().buildFuture();
213 return RpcResultBuilder.<Void>failed().buildFuture();
217 public Future<RpcResult<java.lang.Void>> registerLoggingDtcl() {
218 LOG.info("Registering a new CarDataTreeChangeListener");
219 final ListenerRegistration<CarDataTreeChangeListener> carsDtclRegistration =
220 dataProvider.registerDataTreeChangeListener(CARS_DTID, new CarDataTreeChangeListener());
222 if (carsDtclRegistration != null) {
223 carsDtclRegistrations.add(carsDtclRegistration);
224 return RpcResultBuilder.<Void>success().buildFuture();
226 return RpcResultBuilder.<Void>failed().buildFuture();
230 public Future<RpcResult<java.lang.Void>> unregisterLoggingDcls() {
231 LOG.info("Unregistering the CarDataChangeListener(s)");
232 synchronized (carsDclRegistrations) {
233 int numListeners = 0;
234 for (ListenerRegistration<DataChangeListener> carsDclRegistration : carsDclRegistrations) {
235 carsDclRegistration.close();
238 carsDclRegistrations.clear();
239 LOG.info("Unregistered {} CarDataChangeListener(s)", numListeners);
241 return RpcResultBuilder.<Void>success().buildFuture();
245 public Future<RpcResult<java.lang.Void>> unregisterLoggingDtcls() {
246 LOG.info("Unregistering the CarDataTreeChangeListener(s)");
247 synchronized (carsDtclRegistrations) {
248 int numListeners = 0;
249 for (ListenerRegistration<CarDataTreeChangeListener> carsDtclRegistration : carsDtclRegistrations) {
250 carsDtclRegistration.close();
253 carsDtclRegistrations.clear();
254 LOG.info("Unregistered {} CaraDataTreeChangeListener(s)", numListeners);
256 return RpcResultBuilder.<Void>success().buildFuture();