fd47b7176b2411b2bb629d65c56c5a1bac7474af
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / WriteTransactionsHandler.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider.impl;
10
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.HashSet;
18 import java.util.LinkedHashSet;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
34 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
35 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class WriteTransactionsHandler implements Runnable {
51
52     private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
53     private static final int SECOND_AS_NANO = 1000000000;
54     //2^20 as in the model
55     private static final int MAX_ITEM = 1048576;
56
57     private static final QName ID_INTS =
58             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
59     private static final QName ID =
60             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
61     private static final QName ITEM =
62             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
63     private static final QName NUMBER =
64             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
65
66     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
67
68     private final DOMDataBroker domDataBroker;
69     private final Long timeToTake;
70     private final Long delay;
71     private final String id;
72     private final WriteTransactionsInput input;
73
74     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
75     private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
76     private final Set<Integer> usedValues = new HashSet<>();
77
78     private RandomnessProvider random;
79     private TxProvider txProvider;
80
81     private long startTime;
82     private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
83
84     private long allTx = 0;
85     private long insertTx = 0;
86     private long deleteTx = 0;
87     private ScheduledFuture<?> scheduledFuture;
88     private YangInstanceIdentifier idListWithKey;
89
90     public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
91         this.domDataBroker = domDataBroker;
92         this.input = input;
93
94         timeToTake = input.getSeconds() * SECOND_AS_NANO;
95         delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
96         id = input.getId();
97     }
98
99     @Override
100     public void run() {
101         final long current = System.nanoTime();
102
103         futures.add(execWrite());
104
105         maybeFinish(current);
106     }
107
108     public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
109         LOG.debug("Starting write-transactions.");
110
111         if (input.isChainedTransactions()) {
112             txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
113             random = new BasicProvider();
114         } else {
115             txProvider = new DataBrokerBackedProvider(domDataBroker);
116             random = new NonConflictingProvider();
117         }
118
119         if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
120             startTime = System.nanoTime();
121             completionFuture = settableFuture;
122             scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
123         } else {
124             executor.shutdown();
125         }
126     }
127
128     private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
129
130         final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id)
131                 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
132                 .build();
133         final MapNode mapNode =
134                 ImmutableNodes.mapNodeBuilder(ID_INTS)
135                         .withChild(entry)
136                         .build();
137
138         final DOMDataWriteTransaction tx = txProvider.createTransaction();
139         idListWithKey = ID_INTS_YID.node(entry.getIdentifier());
140         tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, mapNode);
141
142         try {
143             tx.submit().checkedGet();
144         } catch (final TransactionCommitFailedException e) {
145             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
146             settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
147                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
148             return false;
149         }
150
151         return true;
152     }
153
154     private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
155         LOG.debug("Filling the item list with initial values.");
156
157         final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
158         for (int i = 0; i < MAX_ITEM / 2; i++) {
159             usedValues.add(i);
160             mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
161         }
162
163         final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM);
164         final DOMDataWriteTransaction tx = txProvider.createTransaction();
165         tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
166
167         try {
168             tx.submit().checkedGet();
169         } catch (final TransactionCommitFailedException e) {
170             LOG.warn("Unable to fill the initial item list.", e);
171             settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
172                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
173             return false;
174         }
175
176         return true;
177     }
178
179     private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
180         final int i = random.nextInt(MAX_ITEM + 1);
181
182         final YangInstanceIdentifier entryId =
183                 idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
184
185         final DOMDataWriteTransaction tx = txProvider.createTransaction();
186         allTx++;
187
188         if (usedValues.contains(i)) {
189             LOG.debug("Deleting item: {}", i);
190             deleteTx++;
191             tx.delete(LogicalDatastoreType.CONFIGURATION, entryId);
192             usedValues.remove(i);
193
194         } else {
195             LOG.debug("Inserting item: {}", i);
196             insertTx++;
197             final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
198             tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry);
199             usedValues.add(i);
200         }
201
202         return tx.submit();
203     }
204
205     private void maybeFinish(final long current) {
206         if ((current - startTime) > timeToTake) {
207             LOG.debug("Reached max running time, waiting for futures to complete.");
208             scheduledFuture.cancel(false);
209
210             final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
211
212             Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
213                 @Override
214                 public void onSuccess(@Nullable final List<Void> result) {
215                     LOG.debug("All futures completed successfully.");
216
217                     final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
218                             .setAllTx(allTx)
219                             .setInsertTx(insertTx)
220                             .setDeleteTx(deleteTx)
221                             .build();
222
223                     completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
224                             .withResult(output).build());
225
226                     executor.shutdown();
227                 }
228
229                 @Override
230                 public void onFailure(final Throwable t) {
231                     LOG.error("Write transactions failed.", t);
232                     completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
233                             .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());
234
235                     executor.shutdown();
236                 }
237             });
238         }
239     }
240
241     private interface RandomnessProvider {
242         int nextInt(int bound);
243     }
244
245     private static class NonConflictingProvider implements RandomnessProvider {
246
247         private final SplittableRandom random = new SplittableRandom();
248         private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
249
250         @Override
251         public int nextInt(int bound) {
252             int nextInt;
253             do {
254                 nextInt = random.nextInt(bound);
255             } while (previousNumbers.contains(nextInt));
256
257             if (previousNumbers.size() > 100000) {
258                 previousNumbers.iterator().remove();
259             }
260             previousNumbers.add(nextInt);
261
262             return nextInt;
263         }
264     }
265
266     private static class BasicProvider implements RandomnessProvider {
267
268         private final SplittableRandom random = new SplittableRandom();
269
270         @Override
271         public int nextInt(int bound) {
272             return random.nextInt(bound);
273         }
274     }
275
276     private interface TxProvider {
277
278         DOMDataWriteTransaction createTransaction();
279     }
280
281     private static class TxChainBackedProvider implements TxProvider {
282
283         private final DOMTransactionChain transactionChain;
284
285         TxChainBackedProvider(final DOMDataBroker dataBroker,
286                               final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
287                               final ScheduledExecutorService executor) {
288
289             transactionChain =
290                     dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
291         }
292
293         @Override
294         public DOMDataWriteTransaction createTransaction() {
295             return transactionChain.newWriteOnlyTransaction();
296         }
297     }
298
299     private static class DataBrokerBackedProvider implements TxProvider {
300
301         private final DOMDataBroker dataBroker;
302
303         DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
304             this.dataBroker = dataBroker;
305         }
306
307         @Override
308         public DOMDataWriteTransaction createTransaction() {
309             return dataBroker.newWriteOnlyTransaction();
310         }
311     }
312
313     private static class TestChainListener implements TransactionChainListener {
314
315         private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
316         private final ScheduledExecutorService executor;
317
318         TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
319                           final ScheduledExecutorService executor) {
320
321             this.resultFuture = resultFuture;
322             this.executor = executor;
323         }
324
325         @Override
326         public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
327                                              final AsyncTransaction<?, ?> transaction,
328                                              final Throwable cause) {
329             LOG.warn("Transaction chain failed.", cause);
330             resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
331                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
332
333             executor.shutdown();
334         }
335
336         @Override
337         public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
338             LOG.debug("Transaction chain closed successfully.");
339         }
340     }
341 }