Bug 6348 : car:stop-stress-test RPC to return success & failure counters
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / CarProvider.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.collect.Sets;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import java.util.Collection;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.TimeoutException;
19 import java.util.concurrent.atomic.AtomicBoolean;
20 import java.util.concurrent.atomic.AtomicLong;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
24 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
26 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
27 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
29 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarsBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterOwnershipInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestOutputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterOwnershipInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntry;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntryBuilder;
44 import org.opendaylight.yangtools.concepts.ListenerRegistration;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * @author Thomas Pantelis
54  */
55 public class CarProvider implements CarService {
56     private static final Logger log = LoggerFactory.getLogger(PurchaseCarProvider.class);
57
58     private final DataBroker dataProvider;
59     private final EntityOwnershipService ownershipService;
60     private static final Logger LOG = LoggerFactory.getLogger(CarProvider.class);
61
62     private static final String ENTITY_TYPE = "cars";
63
64     private AtomicLong succcessCounter = new AtomicLong();
65     private AtomicLong failureCounter = new AtomicLong();
66
67     private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener();
68     private final AtomicBoolean registeredListener = new AtomicBoolean();
69
70     private volatile Thread testThread;
71     private volatile boolean stopThread;
72
73     private static final InstanceIdentifier CARS_IID = InstanceIdentifier.builder(Cars.class).build();
74     private static final DataTreeIdentifier<Cars> CARS_DTID = new DataTreeIdentifier<>(
75             LogicalDatastoreType.CONFIGURATION, CARS_IID);
76
77     private Collection<ListenerRegistration<DataChangeListener>> carsDclRegistrations =
78             Sets.newConcurrentHashSet();
79     private Collection<ListenerRegistration<CarDataTreeChangeListener>> carsDtclRegistrations =
80             Sets.newConcurrentHashSet();
81
82     public CarProvider(DataBroker dataProvider, EntityOwnershipService ownershipService) {
83         this.dataProvider = dataProvider;
84         this.ownershipService = ownershipService;
85     }
86
87     private void stopThread() {
88         if(testThread != null) {
89             stopThread = true;
90             testThread.interrupt();
91             try {
92                 testThread.join();
93             } catch (InterruptedException e) {}
94             testThread = null;
95         }
96     }
97
98     @Override
99     public Future<RpcResult<Void>> stressTest(StressTestInput input) {
100         final int inputRate;
101         final long inputCount;
102
103         // If rate is not provided, or given as zero, then just return.
104         if ((input.getRate() == null) || (input.getRate() == 0)) {
105             log.info("Exiting stress test as no rate is given.");
106             return Futures.immediateFuture(RpcResultBuilder.<Void>failed()
107                     .withError(ErrorType.PROTOCOL, "invalid rate")
108                     .build());
109         } else {
110             inputRate = input.getRate();
111         }
112
113         if (input.getCount() != null) {
114             inputCount = input.getCount();
115         } else {
116             inputCount = 0;
117         }
118
119         log.info("Stress test starting : rate: {} count: {}", inputRate, inputCount);
120
121         stopThread();
122         // clear counters
123         succcessCounter.set(0);
124         failureCounter.set(0);
125
126         WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
127         InstanceIdentifier<Cars> carsId = InstanceIdentifier.<Cars>builder(Cars.class).build();
128         tx.merge(LogicalDatastoreType.CONFIGURATION, carsId, new CarsBuilder().build());
129         try {
130             tx.submit().checkedGet(5, TimeUnit.SECONDS);
131         } catch (TransactionCommitFailedException | TimeoutException e) {
132             log.error("Put Cars failed",e);
133             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
134         }
135
136         stopThread = false;
137         final long sleep = TimeUnit.NANOSECONDS.convert(1000,TimeUnit.MILLISECONDS) / inputRate;
138         final Stopwatch sw = Stopwatch.createUnstarted();
139         testThread = new Thread() {
140             @Override
141             public void run() {
142                 sw.start();
143                 AtomicLong count = new AtomicLong();
144                 while(!stopThread) {
145                     long id = count.incrementAndGet();
146                     WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
147                     CarEntry car = new CarEntryBuilder().setId(new CarId("car"+id)).build();
148                     tx.put(LogicalDatastoreType.CONFIGURATION,
149                             InstanceIdentifier.<Cars>builder(Cars.class).child(CarEntry.class, car.getKey()).build(),
150                             car);
151                     CheckedFuture<Void, TransactionCommitFailedException> future =  tx.submit();
152                     Futures.addCallback(future, new FutureCallback<Void>() {
153
154                         @Override
155                         public void onSuccess(final Void result) {
156                             // Transaction succeeded
157                             succcessCounter.getAndIncrement();
158                         }
159
160                         @Override
161                         public void onFailure(final Throwable t) {
162                             // Transaction failed
163                             failureCounter.getAndIncrement();
164                             LOG.error("Put Cars failed", t);
165                         }
166                     });
167                     try {
168                         TimeUnit.NANOSECONDS.sleep(sleep);
169                     } catch (InterruptedException e) {
170                         break;
171                     }
172
173                     if((count.get() % 1000) == 0) {
174                         log.info("Cars created {}, time: {}",count.get(),sw.elapsed(TimeUnit.SECONDS));
175                     }
176
177                     // Check if a count is specified in input and we have created that many cars.
178                     if ((inputCount != 0) && (count.get() >= inputCount)) {
179                         stopThread = true;
180                     }
181                 }
182
183                 log.info("Stress test thread stopping after creating {} cars.", count.get());
184             }
185         };
186         testThread.start();
187
188         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
189     }
190
191     @Override
192     public Future<RpcResult<StopStressTestOutput>> stopStressTest() {
193         stopThread();
194         StopStressTestOutputBuilder stopStressTestOutput;
195         stopStressTestOutput = new StopStressTestOutputBuilder()
196                 .setSuccessCount(succcessCounter.longValue())
197                 .setFailureCount(failureCounter.longValue());
198
199         StopStressTestOutput result = stopStressTestOutput.build();
200         log.info("Executed Stop Stress test; No. of cars created {}; " +
201                 "No. of cars failed {}; ", succcessCounter, failureCounter);
202         // clear counters
203         succcessCounter.set(0);
204         failureCounter.set(0);
205         return Futures.immediateFuture(RpcResultBuilder.<StopStressTestOutput>success(result).build());
206     }
207
208
209     @Override
210     public Future<RpcResult<Void>> registerOwnership(RegisterOwnershipInput input) {
211         if(registeredListener.compareAndSet(false, true)) {
212             ownershipService.registerListener(ENTITY_TYPE, ownershipListener);
213         }
214
215         Entity entity = new Entity(ENTITY_TYPE, input.getCarId());
216         try {
217             ownershipService.registerCandidate(entity);
218         } catch (CandidateAlreadyRegisteredException e) {
219             return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
220                     "Could not register for car " + input.getCarId(), e).buildFuture();
221         }
222
223         return RpcResultBuilder.<Void>success().buildFuture();
224     }
225
226     @Override
227     public Future<RpcResult<Void>> unregisterOwnership(UnregisterOwnershipInput input) {
228         return RpcResultBuilder.<Void>success().buildFuture();
229     }
230
231     private static class CarEntityOwnershipListener implements EntityOwnershipListener {
232         @Override
233         public void ownershipChanged(EntityOwnershipChange ownershipChange) {
234             LOG.info("ownershipChanged: {}", ownershipChange);
235         }
236     }
237
238     @Override
239     public Future<RpcResult<java.lang.Void>> registerLoggingDcl() {
240         LOG.info("Registering a new CarDataChangeListener");
241         final ListenerRegistration carsDclRegistration = dataProvider.registerDataChangeListener(
242                 LogicalDatastoreType.CONFIGURATION, CARS_IID, new CarDataChangeListener(),
243                 AsyncDataBroker.DataChangeScope.SUBTREE);
244
245         if (carsDclRegistration != null) {
246             carsDclRegistrations.add(carsDclRegistration);
247             return RpcResultBuilder.<Void>success().buildFuture();
248         }
249         return RpcResultBuilder.<Void>failed().buildFuture();
250     }
251
252     @Override
253     public Future<RpcResult<java.lang.Void>> registerLoggingDtcl() {
254         LOG.info("Registering a new CarDataTreeChangeListener");
255         final ListenerRegistration<CarDataTreeChangeListener> carsDtclRegistration =
256                 dataProvider.registerDataTreeChangeListener(CARS_DTID, new CarDataTreeChangeListener());
257
258         if (carsDtclRegistration != null) {
259             carsDtclRegistrations.add(carsDtclRegistration);
260             return RpcResultBuilder.<Void>success().buildFuture();
261         }
262         return RpcResultBuilder.<Void>failed().buildFuture();
263     }
264
265     @Override
266     public Future<RpcResult<java.lang.Void>> unregisterLoggingDcls() {
267         LOG.info("Unregistering the CarDataChangeListener(s)");
268         synchronized (carsDclRegistrations) {
269             int numListeners = 0;
270             for (ListenerRegistration<DataChangeListener> carsDclRegistration : carsDclRegistrations) {
271                 carsDclRegistration.close();
272                 numListeners++;
273             }
274             carsDclRegistrations.clear();
275             LOG.info("Unregistered {} CarDataChangeListener(s)", numListeners);
276         }
277         return RpcResultBuilder.<Void>success().buildFuture();
278     }
279
280     @Override
281     public Future<RpcResult<java.lang.Void>> unregisterLoggingDtcls() {
282         LOG.info("Unregistering the CarDataTreeChangeListener(s)");
283         synchronized (carsDtclRegistrations) {
284             int numListeners = 0;
285             for (ListenerRegistration<CarDataTreeChangeListener> carsDtclRegistration : carsDtclRegistrations) {
286                 carsDtclRegistration.close();
287                 numListeners++;
288             }
289             carsDtclRegistrations.clear();
290             LOG.info("Unregistered {} CaraDataTreeChangeListener(s)", numListeners);
291         }
292         return RpcResultBuilder.<Void>success().buildFuture();
293     }
294 }