Bump versions to 4.0.0-SNAPSHOT
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / CarProvider.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.concurrent.atomic.AtomicLong;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.opendaylight.mdsal.binding.api.DataBroker;
25 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
26 import org.opendaylight.mdsal.binding.api.WriteTransaction;
27 import org.opendaylight.mdsal.common.api.CommitInfo;
28 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
29 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
33 import org.opendaylight.mdsal.eos.binding.api.Entity;
34 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
35 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
37 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarId;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarService;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.CarsBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterCommitCohortInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterCommitCohortOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterCommitCohortOutputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterLoggingDtclInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterLoggingDtclOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterLoggingDtclOutputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterOwnershipInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterOwnershipOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.RegisterOwnershipOutputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestOutput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StopStressTestOutputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.StressTestOutputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterCommitCohortInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterCommitCohortOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterCommitCohortOutputBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterLoggingDtclsInput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterLoggingDtclsOutput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterLoggingDtclsOutputBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterOwnershipInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterOwnershipOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.UnregisterOwnershipOutputBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntry;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.cars.CarEntryBuilder;
68 import org.opendaylight.yangtools.concepts.ListenerRegistration;
69 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
70 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
71 import org.opendaylight.yangtools.yang.common.RpcResult;
72 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
73 import org.opendaylight.yangtools.yang.common.Uint32;
74 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77
78 /**
79  * Implementation of CarService.
80  *
81  * @author Thomas Pantelis
82  */
83 @SuppressFBWarnings("SLF4J_ILLEGAL_PASSED_CLASS")
84 public class CarProvider implements CarService {
85     private static final Logger LOG_PURCHASE_CAR = LoggerFactory.getLogger(PurchaseCarProvider.class);
86
87     private static final Logger LOG_CAR_PROVIDER = LoggerFactory.getLogger(CarProvider.class);
88
89     private static final String ENTITY_TYPE = "cars";
90     private static final InstanceIdentifier<Cars> CARS_IID = InstanceIdentifier.builder(Cars.class).build();
91     private static final DataTreeIdentifier<Cars> CARS_DTID = DataTreeIdentifier.create(
92             LogicalDatastoreType.CONFIGURATION, CARS_IID);
93
94     private final DataBroker dataProvider;
95     private final DOMDataBroker domDataBroker;
96     private final EntityOwnershipService ownershipService;
97     private final AtomicLong succcessCounter = new AtomicLong();
98     private final AtomicLong failureCounter = new AtomicLong();
99
100     private final CarEntityOwnershipListener ownershipListener = new CarEntityOwnershipListener();
101     private final AtomicBoolean registeredListener = new AtomicBoolean();
102
103     private final Set<ListenerRegistration<?>> carsDclRegistrations = ConcurrentHashMap.newKeySet();
104     private final Set<ListenerRegistration<CarDataTreeChangeListener>> carsDtclRegistrations =
105             ConcurrentHashMap.newKeySet();
106
107     private volatile Thread testThread;
108     private volatile boolean stopThread;
109     private final AtomicReference<DOMDataTreeCommitCohortRegistration<CarEntryDataTreeCommitCohort>> commitCohortReg =
110             new AtomicReference<>();
111
112     public CarProvider(final DataBroker dataProvider, final EntityOwnershipService ownershipService,
113             final DOMDataBroker domDataBroker) {
114         this.dataProvider = dataProvider;
115         this.ownershipService = ownershipService;
116         this.domDataBroker = domDataBroker;
117     }
118
119     public void close() {
120         stopThread();
121         closeCommitCohortRegistration();
122     }
123
124     private void stopThread() {
125         if (testThread != null) {
126             stopThread = true;
127             testThread.interrupt();
128             try {
129                 testThread.join();
130             } catch (InterruptedException e) {
131                 // don't care
132             }
133             testThread = null;
134         }
135     }
136
137     @Override
138     public ListenableFuture<RpcResult<StressTestOutput>> stressTest(final StressTestInput input) {
139         final int inputRate;
140         final long inputCount;
141
142         // If rate is not provided, or given as zero, then just return.
143         if (input.getRate() == null || input.getRate().toJava() == 0) {
144             LOG_PURCHASE_CAR.info("Exiting stress test as no rate is given.");
145             return Futures.immediateFuture(RpcResultBuilder.<StressTestOutput>failed()
146                     .withError(ErrorType.PROTOCOL, "invalid rate")
147                     .build());
148         }
149
150         inputRate = input.getRate().toJava();
151         if (input.getCount() != null) {
152             inputCount = input.getCount().toJava();
153         } else {
154             inputCount = 0;
155         }
156
157         LOG_PURCHASE_CAR.info("Stress test starting : rate: {} count: {}", inputRate, inputCount);
158
159         stopThread();
160         // clear counters
161         succcessCounter.set(0);
162         failureCounter.set(0);
163
164         WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
165         InstanceIdentifier<Cars> carsId = InstanceIdentifier.create(Cars.class);
166         tx.merge(LogicalDatastoreType.CONFIGURATION, carsId, new CarsBuilder().build());
167         try {
168             tx.commit().get(5, TimeUnit.SECONDS);
169         } catch (TimeoutException | InterruptedException | ExecutionException e) {
170             LOG_PURCHASE_CAR.error("Put Cars failed",e);
171             return Futures.immediateFuture(RpcResultBuilder.success(new StressTestOutputBuilder().build()).build());
172         }
173
174         stopThread = false;
175         final long sleep = TimeUnit.NANOSECONDS.convert(1000,TimeUnit.MILLISECONDS) / inputRate;
176         final Stopwatch sw = Stopwatch.createUnstarted();
177         testThread = new Thread(() -> {
178             sw.start();
179             AtomicLong count = new AtomicLong();
180             while (!stopThread) {
181                 long id = count.incrementAndGet();
182                 WriteTransaction tx1 = dataProvider.newWriteOnlyTransaction();
183                 CarEntry car = new CarEntryBuilder().setId(new CarId("car" + id)).build();
184                 tx1.put(LogicalDatastoreType.CONFIGURATION,
185                         InstanceIdentifier.<Cars>builder(Cars.class).child(CarEntry.class, car.key()).build(), car);
186                 tx1.commit().addCallback(new FutureCallback<CommitInfo>() {
187
188                     @Override
189                     public void onSuccess(final CommitInfo result) {
190                         // Transaction succeeded
191                         succcessCounter.getAndIncrement();
192                     }
193
194                     @Override
195                     public void onFailure(final Throwable ex) {
196                         // Transaction failed
197                         failureCounter.getAndIncrement();
198                         LOG_CAR_PROVIDER.error("Put Cars failed", ex);
199                     }
200                 }, MoreExecutors.directExecutor());
201                 try {
202                     TimeUnit.NANOSECONDS.sleep(sleep);
203                 } catch (InterruptedException e) {
204                     break;
205                 }
206
207                 if (count.get() % 1000 == 0) {
208                     LOG_PURCHASE_CAR.info("Cars created {}, time: {}", count.get(), sw.elapsed(TimeUnit.SECONDS));
209                 }
210
211                 // Check if a count is specified in input and we have created that many cars.
212                 if (inputCount != 0 && count.get() >= inputCount) {
213                     stopThread = true;
214                 }
215             }
216
217             LOG_PURCHASE_CAR.info("Stress test thread stopping after creating {} cars.", count.get());
218         });
219         testThread.start();
220
221         return Futures.immediateFuture(RpcResultBuilder.success(new StressTestOutputBuilder().build()).build());
222     }
223
224     @Override
225     public ListenableFuture<RpcResult<StopStressTestOutput>> stopStressTest(final StopStressTestInput input) {
226         stopThread();
227         StopStressTestOutputBuilder stopStressTestOutput;
228         stopStressTestOutput = new StopStressTestOutputBuilder()
229                 .setSuccessCount(Uint32.valueOf(succcessCounter.longValue()))
230                 .setFailureCount(Uint32.valueOf(failureCounter.longValue()));
231
232         final StopStressTestOutput result = stopStressTestOutput.build();
233         LOG_PURCHASE_CAR.info("Executed Stop Stress test; No. of cars created {}; "
234                 + "No. of cars failed {}; ", succcessCounter, failureCounter);
235         // clear counters
236         succcessCounter.set(0);
237         failureCounter.set(0);
238         return Futures.immediateFuture(RpcResultBuilder.<StopStressTestOutput>success(result).build());
239     }
240
241
242     @Override
243     public ListenableFuture<RpcResult<RegisterOwnershipOutput>> registerOwnership(final RegisterOwnershipInput input) {
244         if (registeredListener.compareAndSet(false, true)) {
245             ownershipService.registerListener(ENTITY_TYPE, ownershipListener);
246         }
247
248         Entity entity = new Entity(ENTITY_TYPE, input.getCarId());
249         try {
250             ownershipService.registerCandidate(entity);
251         } catch (CandidateAlreadyRegisteredException e) {
252             return RpcResultBuilder.<RegisterOwnershipOutput>failed().withError(ErrorType.APPLICATION,
253                     "Could not register for car " + input.getCarId(), e).buildFuture();
254         }
255
256         return RpcResultBuilder.success(new RegisterOwnershipOutputBuilder().build()).buildFuture();
257     }
258
259     @Override
260     public ListenableFuture<RpcResult<UnregisterOwnershipOutput>> unregisterOwnership(
261             final UnregisterOwnershipInput input) {
262         return RpcResultBuilder.success(new UnregisterOwnershipOutputBuilder().build()).buildFuture();
263     }
264
265     private static class CarEntityOwnershipListener implements EntityOwnershipListener {
266         @Override
267         public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
268             LOG_CAR_PROVIDER.info("ownershipChanged: {}", ownershipChange);
269         }
270     }
271
272     @Override
273     public ListenableFuture<RpcResult<RegisterLoggingDtclOutput>> registerLoggingDtcl(
274             final RegisterLoggingDtclInput input) {
275         LOG_CAR_PROVIDER.info("Registering a new CarDataTreeChangeListener");
276         final ListenerRegistration<CarDataTreeChangeListener> carsDtclRegistration =
277                 dataProvider.registerDataTreeChangeListener(CARS_DTID, new CarDataTreeChangeListener());
278
279         carsDtclRegistrations.add(carsDtclRegistration);
280         return RpcResultBuilder.success(new RegisterLoggingDtclOutputBuilder().build()).buildFuture();
281     }
282
283     @Override
284     public ListenableFuture<RpcResult<UnregisterLoggingDtclsOutput>> unregisterLoggingDtcls(
285             final UnregisterLoggingDtclsInput input) {
286         LOG_CAR_PROVIDER.info("Unregistering the CarDataTreeChangeListener(s)");
287         synchronized (carsDtclRegistrations) {
288             int numListeners = 0;
289             for (ListenerRegistration<CarDataTreeChangeListener> carsDtclRegistration : carsDtclRegistrations) {
290                 carsDtclRegistration.close();
291                 numListeners++;
292             }
293             carsDtclRegistrations.clear();
294             LOG_CAR_PROVIDER.info("Unregistered {} CaraDataTreeChangeListener(s)", numListeners);
295         }
296         return RpcResultBuilder.success(new UnregisterLoggingDtclsOutputBuilder().build()).buildFuture();
297     }
298
299     @Override
300     @SuppressWarnings("checkstyle:IllegalCatch")
301     public ListenableFuture<RpcResult<UnregisterCommitCohortOutput>> unregisterCommitCohort(
302             final UnregisterCommitCohortInput input) {
303         closeCommitCohortRegistration();
304
305         return RpcResultBuilder.success(new UnregisterCommitCohortOutputBuilder().build()).buildFuture();
306     }
307
308     private void closeCommitCohortRegistration() {
309         final DOMDataTreeCommitCohortRegistration<CarEntryDataTreeCommitCohort> reg = commitCohortReg.getAndSet(null);
310         if (reg != null) {
311             reg.close();
312             LOG_CAR_PROVIDER.info("Unregistered commit cohort");
313         }
314     }
315
316     @Override
317     public synchronized ListenableFuture<RpcResult<RegisterCommitCohortOutput>> registerCommitCohort(
318             final RegisterCommitCohortInput input) {
319         if (commitCohortReg.get() != null) {
320             return RpcResultBuilder.success(new RegisterCommitCohortOutputBuilder().build()).buildFuture();
321         }
322
323         final DOMDataTreeCommitCohortRegistry commitCohortRegistry = domDataBroker.getExtensions().getInstance(
324             DOMDataTreeCommitCohortRegistry.class);
325
326         if (commitCohortRegistry == null) {
327             // Shouldn't happen
328             return RpcResultBuilder.<RegisterCommitCohortOutput>failed().withError(ErrorType.APPLICATION,
329                     "DOMDataTreeCommitCohortRegistry not found").buildFuture();
330         }
331
332         // Note: it may look strange that we specify the CarEntry.QNAME twice in the path below. This must be done in
333         // order to register the commit cohort for CarEntry instances. In the underlying data tree, a yang list is
334         // represented as a MapNode with MapEntryNodes representing the child list entries. Therefore, in order to
335         // address a list entry, you must specify the path argument for the MapNode and the path argument for the
336         // MapEntryNode. In the path below, the first CarEntry.QNAME argument addresses the MapNode and, since we want
337         // to address all list entries, the second path argument is wild-carded by specifying just the CarEntry.QNAME.
338         final YangInstanceIdentifier carEntryPath = YangInstanceIdentifier.builder(
339                 YangInstanceIdentifier.of(Cars.QNAME)).node(CarEntry.QNAME).node(CarEntry.QNAME).build();
340         commitCohortReg.set(commitCohortRegistry.registerCommitCohort(new DOMDataTreeIdentifier(
341             LogicalDatastoreType.CONFIGURATION, carEntryPath), new CarEntryDataTreeCommitCohort()));
342
343         LOG_CAR_PROVIDER.info("Registered commit cohort");
344
345         return RpcResultBuilder.success(new RegisterCommitCohortOutputBuilder().build()).buildFuture();
346     }
347 }