Cleanup ProduceTransactionsHandler
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / WriteTransactionsHandler.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider.impl;
10
11 import com.google.common.base.Stopwatch;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.ArrayList;
18 import java.util.HashSet;
19 import java.util.LinkedHashSet;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.SplittableRandom;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
35 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
36 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
37 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
38 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
39 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
40 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
41 import org.opendaylight.yangtools.yang.common.QName;
42 import org.opendaylight.yangtools.yang.common.RpcError;
43 import org.opendaylight.yangtools.yang.common.RpcResult;
44 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
50 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
51 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
52 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 public class WriteTransactionsHandler implements Runnable {
57
58     private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
59     private static final int SECOND_AS_NANO = 1000000000;
60     //2^20 as in the model
61     private static final int MAX_ITEM = 1048576;
62
63     private static final QName ID_INTS =
64             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
65     private static final QName ID_INT =
66             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
67     private static final QName ID =
68             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
69     private static final QName ITEM =
70             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
71     private static final QName NUMBER =
72             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
73
74     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
75     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
76
77     private final WriteTransactionsInput input;
78
79     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
80     private final List<ListenableFuture<Void>> futures = new ArrayList<>();
81     private final Set<Integer> usedValues = new HashSet<>();
82
83     private RandomnessProvider random;
84     private TxProvider txProvider;
85
86     private final DOMDataBroker domDataBroker;
87     private final Long runtimeNanos;
88     private final Long delayNanos;
89     private final String id;
90
91     private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
92     private Stopwatch stopwatch;
93
94     private long allTx = 0;
95     private long insertTx = 0;
96     private long deleteTx = 0;
97     private ScheduledFuture<?> scheduledFuture;
98     private YangInstanceIdentifier idListItem;
99
100     public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
101         this.domDataBroker = domDataBroker;
102         this.input = input;
103
104         runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds());
105         delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond();
106         id = input.getId();
107     }
108
109     @Override
110     public void run() {
111         futures.add(execWrite(futures.size()));
112         maybeFinish();
113     }
114
115     public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
116         LOG.debug("Starting write-transactions.");
117
118         if (input.isChainedTransactions()) {
119             txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
120             random = new BasicProvider();
121         } else {
122             txProvider = new DataBrokerBackedProvider(domDataBroker);
123             random = new NonConflictingProvider();
124         }
125
126         if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
127             stopwatch = Stopwatch.createStarted();
128             completionFuture = settableFuture;
129             scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS);
130         } else {
131             executor.shutdown();
132         }
133     }
134
135     private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
136
137         final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
138                 .withNodeIdentifier(new NodeIdentifier(ID_INTS))
139                 .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build())
140                 .build();
141
142         DOMDataWriteTransaction tx = txProvider.createTransaction();
143         // write only the top list
144         tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode);
145         try {
146             tx.submit().checkedGet(125, TimeUnit.SECONDS);
147         } catch (final OptimisticLockFailedException e) {
148             // when multiple write-transactions are executed concurrently we need to ignore this.
149             // If we get optimistic lock here it means id-ints already exists and we can continue.
150             LOG.debug("Got an optimistic lock when writing initial top level list element.", e);
151         } catch (final TransactionCommitFailedException | TimeoutException e) {
152             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
153             settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
154                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
155             return false;
156         }
157
158         final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
159                 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
160                 .build();
161
162         idListItem = ID_INT_YID.node(entry.getIdentifier());
163         tx = txProvider.createTransaction();
164         tx.merge(LogicalDatastoreType.CONFIGURATION, idListItem, entry);
165
166         try {
167             tx.submit().checkedGet(125, TimeUnit.SECONDS);
168             return true;
169         } catch (final Exception e) {
170             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
171             settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
172                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
173             return false;
174         }
175     }
176
177     private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
178         LOG.debug("Filling the item list with initial values.");
179
180         final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
181
182         final YangInstanceIdentifier itemListId = idListItem.node(ITEM);
183         final DOMDataWriteTransaction tx = txProvider.createTransaction();
184         tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
185
186         try {
187             tx.submit().checkedGet(125, TimeUnit.SECONDS);
188             return true;
189         } catch (final Exception e) {
190             LOG.warn("Unable to fill the initial item list.", e);
191             settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
192                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
193             return false;
194         }
195     }
196
197     private ListenableFuture<Void> execWrite(final int offset) {
198         final int i = random.nextInt(MAX_ITEM + 1);
199
200         final YangInstanceIdentifier entryId =
201                 idListItem.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
202
203         final DOMDataWriteTransaction tx = txProvider.createTransaction();
204         allTx++;
205
206         if (usedValues.contains(i)) {
207             LOG.debug("Deleting item: {}", i);
208             deleteTx++;
209             tx.delete(LogicalDatastoreType.CONFIGURATION, entryId);
210             usedValues.remove(i);
211
212         } else {
213             LOG.debug("Inserting item: {}", i);
214             insertTx++;
215             final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
216             tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry);
217             usedValues.add(i);
218         }
219
220         final ListenableFuture<Void> future = tx.submit();
221         if (LOG.isDebugEnabled()) {
222             Futures.addCallback(future, new FutureCallback<Void>() {
223                 @Override
224                 public void onSuccess(final Void result) {
225                     LOG.debug("Future #{} completed successfully", offset);
226                 }
227
228                 @Override
229                 public void onFailure(final Throwable cause) {
230                     LOG.debug("Future #{} failed", offset, cause);
231                 }
232             });
233         }
234
235         return future;
236     }
237
238     private void maybeFinish() {
239         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
240         if (elapsed >= runtimeNanos) {
241             LOG.debug("Reached max running time, waiting for futures to complete.");
242             scheduledFuture.cancel(false);
243
244             final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
245
246             try {
247                 // Timeout from cds should be 2 minutes so leave some leeway.
248                 allFutures.get(125, TimeUnit.SECONDS);
249
250                 LOG.debug("All futures completed successfully.");
251
252                 final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
253                         .setAllTx(allTx)
254                         .setInsertTx(insertTx)
255                         .setDeleteTx(deleteTx)
256                         .build();
257
258                 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
259                         .withResult(output).build());
260
261                 executor.shutdown();
262             } catch (final ExecutionException e) {
263                 LOG.error("Write transactions failed.", e.getCause());
264
265                 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
266                         .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
267             } catch (InterruptedException | TimeoutException e) {
268                 LOG.error("Write transactions failed.", e);
269
270                 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
271                         .withError(RpcError.ErrorType.APPLICATION,
272                                 "Final submit was timed out by the test provider or was interrupted", e).build());
273
274                 for (int i = 0; i < futures.size(); i++) {
275                     final ListenableFuture<Void> future = futures.get(i);
276
277                     try {
278                         future.get(0, TimeUnit.NANOSECONDS);
279                     } catch (final TimeoutException fe) {
280                         LOG.warn("Future #{}/{} not completed yet", i, futures.size());
281                     } catch (final ExecutionException fe) {
282                         LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
283                     } catch (final InterruptedException fe) {
284                         LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
285                     }
286                 }
287             } catch (Exception exception) {
288                 LOG.error("Write transactions failed.", exception);
289                 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
290                         .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
291
292                 executor.shutdown();
293             }
294         }
295     }
296
297     private interface RandomnessProvider {
298         int nextInt(int bound);
299     }
300
301     private static class NonConflictingProvider implements RandomnessProvider {
302
303         private final SplittableRandom random = new SplittableRandom();
304         private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
305
306         @Override
307         public int nextInt(int bound) {
308             int nextInt;
309             do {
310                 nextInt = random.nextInt(bound);
311             } while (previousNumbers.contains(nextInt));
312
313             if (previousNumbers.size() > 100000) {
314                 previousNumbers.iterator().remove();
315             }
316             previousNumbers.add(nextInt);
317
318             return nextInt;
319         }
320     }
321
322     private static class BasicProvider implements RandomnessProvider {
323
324         private final SplittableRandom random = new SplittableRandom();
325
326         @Override
327         public int nextInt(int bound) {
328             return random.nextInt(bound);
329         }
330     }
331
332     private interface TxProvider {
333
334         DOMDataWriteTransaction createTransaction();
335     }
336
337     private static class TxChainBackedProvider implements TxProvider {
338
339         private final DOMTransactionChain transactionChain;
340
341         TxChainBackedProvider(final DOMDataBroker dataBroker,
342                               final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
343                               final ScheduledExecutorService executor) {
344
345             transactionChain =
346                     dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
347         }
348
349         @Override
350         public DOMDataWriteTransaction createTransaction() {
351             return transactionChain.newWriteOnlyTransaction();
352         }
353     }
354
355     private static class DataBrokerBackedProvider implements TxProvider {
356
357         private final DOMDataBroker dataBroker;
358
359         DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
360             this.dataBroker = dataBroker;
361         }
362
363         @Override
364         public DOMDataWriteTransaction createTransaction() {
365             return dataBroker.newWriteOnlyTransaction();
366         }
367     }
368
369     private static class TestChainListener implements TransactionChainListener {
370
371         private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
372         private final ScheduledExecutorService executor;
373
374         TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
375                           final ScheduledExecutorService executor) {
376
377             this.resultFuture = resultFuture;
378             this.executor = executor;
379         }
380
381         @Override
382         public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
383                                              final AsyncTransaction<?, ?> transaction,
384                                              final Throwable cause) {
385             LOG.warn("Transaction chain failed.", cause);
386             resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
387                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
388
389             executor.shutdown();
390         }
391
392         @Override
393         public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
394             LOG.debug("Transaction chain closed successfully.");
395         }
396     }
397 }