Nest id-ints list inside a container
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / WriteTransactionsHandler.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider.impl;
10
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.HashSet;
18 import java.util.LinkedHashSet;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
38 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
39 import org.opendaylight.yangtools.yang.common.QName;
40 import org.opendaylight.yangtools.yang.common.RpcError;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
50 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
51 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 public class WriteTransactionsHandler implements Runnable {
56
57     private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
58     private static final int SECOND_AS_NANO = 1000000000;
59     //2^20 as in the model
60     private static final int MAX_ITEM = 1048576;
61
62     private static final QName ID_INTS =
63             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
64     private static final QName ID_INT =
65             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int");
66     private static final QName ID =
67             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
68     private static final QName ITEM =
69             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
70     private static final QName NUMBER =
71             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
72
73     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
74     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT);
75
76     private final DOMDataBroker domDataBroker;
77     private final Long timeToTake;
78     private final Long delay;
79     private final String id;
80     private final WriteTransactionsInput input;
81
82     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
83     private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
84     private final Set<Integer> usedValues = new HashSet<>();
85
86     private RandomnessProvider random;
87     private TxProvider txProvider;
88
89     private long startTime;
90     private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
91
92     private long allTx = 0;
93     private long insertTx = 0;
94     private long deleteTx = 0;
95     private ScheduledFuture<?> scheduledFuture;
96     private YangInstanceIdentifier idListWithKey;
97
98     public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
99         this.domDataBroker = domDataBroker;
100         this.input = input;
101
102         timeToTake = input.getSeconds() * SECOND_AS_NANO;
103         delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
104         id = input.getId();
105     }
106
107     @Override
108     public void run() {
109         final long current = System.nanoTime();
110
111         futures.add(execWrite());
112
113         maybeFinish(current);
114     }
115
116     public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
117         LOG.debug("Starting write-transactions.");
118
119         if (input.isChainedTransactions()) {
120             txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
121             random = new BasicProvider();
122         } else {
123             txProvider = new DataBrokerBackedProvider(domDataBroker);
124             random = new NonConflictingProvider();
125         }
126
127         if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
128             startTime = System.nanoTime();
129             completionFuture = settableFuture;
130             scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
131         } else {
132             executor.shutdown();
133         }
134     }
135
136     private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
137
138         final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
139                 .withNodeIdentifier(new NodeIdentifier(ID_INTS))
140                 .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build())
141                 .build();
142
143         DOMDataWriteTransaction tx = txProvider.createTransaction();
144         // write only the top list
145         tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode);
146         try {
147             tx.submit().checkedGet();
148         } catch (final OptimisticLockFailedException e) {
149             // when multiple write-transactions are executed concurrently we need to ignore this.
150             // If we get optimistic lock here it means id-ints already exists and we can continue.
151             LOG.debug("Got an optimistic lock when writing initial top level list element.", e);
152         } catch (final TransactionCommitFailedException e) {
153             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
154             settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
155                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
156             return false;
157         }
158
159         final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
160                 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
161                 .build();
162
163         idListWithKey = ID_INT_YID.node(entry.getIdentifier());
164         tx = txProvider.createTransaction();
165         tx.merge(LogicalDatastoreType.CONFIGURATION, idListWithKey, entry);
166
167         try {
168             tx.submit().checkedGet();
169         } catch (final TransactionCommitFailedException e) {
170             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
171             settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
172                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
173             return false;
174         }
175
176         return true;
177     }
178
179     private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
180         LOG.debug("Filling the item list with initial values.");
181
182         final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
183         for (int i = 0; i < MAX_ITEM / 2; i++) {
184             usedValues.add(i);
185             mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
186         }
187
188         final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM);
189         final DOMDataWriteTransaction tx = txProvider.createTransaction();
190         tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
191
192         try {
193             tx.submit().checkedGet();
194         } catch (final TransactionCommitFailedException e) {
195             LOG.warn("Unable to fill the initial item list.", e);
196             settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
197                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
198             return false;
199         }
200
201         return true;
202     }
203
204     private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
205         final int i = random.nextInt(MAX_ITEM + 1);
206
207         final YangInstanceIdentifier entryId =
208                 idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
209
210         final DOMDataWriteTransaction tx = txProvider.createTransaction();
211         allTx++;
212
213         if (usedValues.contains(i)) {
214             LOG.debug("Deleting item: {}", i);
215             deleteTx++;
216             tx.delete(LogicalDatastoreType.CONFIGURATION, entryId);
217             usedValues.remove(i);
218
219         } else {
220             LOG.debug("Inserting item: {}", i);
221             insertTx++;
222             final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
223             tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry);
224             usedValues.add(i);
225         }
226
227         return tx.submit();
228     }
229
230     private void maybeFinish(final long current) {
231         if ((current - startTime) > timeToTake) {
232             LOG.debug("Reached max running time, waiting for futures to complete.");
233             scheduledFuture.cancel(false);
234
235             final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
236
237             Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
238                 @Override
239                 public void onSuccess(@Nullable final List<Void> result) {
240                     LOG.debug("All futures completed successfully.");
241
242                     final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
243                             .setAllTx(allTx)
244                             .setInsertTx(insertTx)
245                             .setDeleteTx(deleteTx)
246                             .build();
247
248                     completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
249                             .withResult(output).build());
250
251                     executor.shutdown();
252                 }
253
254                 @Override
255                 public void onFailure(final Throwable t) {
256                     LOG.error("Write transactions failed.", t);
257                     completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
258                             .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());
259
260                     executor.shutdown();
261                 }
262             });
263         }
264     }
265
266     private interface RandomnessProvider {
267         int nextInt(int bound);
268     }
269
270     private static class NonConflictingProvider implements RandomnessProvider {
271
272         private final SplittableRandom random = new SplittableRandom();
273         private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
274
275         @Override
276         public int nextInt(int bound) {
277             int nextInt;
278             do {
279                 nextInt = random.nextInt(bound);
280             } while (previousNumbers.contains(nextInt));
281
282             if (previousNumbers.size() > 100000) {
283                 previousNumbers.iterator().remove();
284             }
285             previousNumbers.add(nextInt);
286
287             return nextInt;
288         }
289     }
290
291     private static class BasicProvider implements RandomnessProvider {
292
293         private final SplittableRandom random = new SplittableRandom();
294
295         @Override
296         public int nextInt(int bound) {
297             return random.nextInt(bound);
298         }
299     }
300
301     private interface TxProvider {
302
303         DOMDataWriteTransaction createTransaction();
304     }
305
306     private static class TxChainBackedProvider implements TxProvider {
307
308         private final DOMTransactionChain transactionChain;
309
310         TxChainBackedProvider(final DOMDataBroker dataBroker,
311                               final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
312                               final ScheduledExecutorService executor) {
313
314             transactionChain =
315                     dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
316         }
317
318         @Override
319         public DOMDataWriteTransaction createTransaction() {
320             return transactionChain.newWriteOnlyTransaction();
321         }
322     }
323
324     private static class DataBrokerBackedProvider implements TxProvider {
325
326         private final DOMDataBroker dataBroker;
327
328         DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
329             this.dataBroker = dataBroker;
330         }
331
332         @Override
333         public DOMDataWriteTransaction createTransaction() {
334             return dataBroker.newWriteOnlyTransaction();
335         }
336     }
337
338     private static class TestChainListener implements TransactionChainListener {
339
340         private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
341         private final ScheduledExecutorService executor;
342
343         TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
344                           final ScheduledExecutorService executor) {
345
346             this.resultFuture = resultFuture;
347             this.executor = executor;
348         }
349
350         @Override
351         public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
352                                              final AsyncTransaction<?, ?> transaction,
353                                              final Throwable cause) {
354             LOG.warn("Transaction chain failed.", cause);
355             resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
356                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
357
358             executor.shutdown();
359         }
360
361         @Override
362         public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
363             LOG.debug("Transaction chain closed successfully.");
364         }
365     }
366 }