Cleanup ProduceTransactionsHandler
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / ProduceTransactionsHandler.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider.impl;
10
11 import com.google.common.base.Stopwatch;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
35 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class ProduceTransactionsHandler implements Runnable {
51
52     private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
53     private static final int SECOND_AS_NANO = 1000000000;
54     //2^20 as in the model
55     private static final int MAX_ITEM = 1048576;
56
57     static final QName ID_INTS =
58             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
59     public static final QName ID_INT =
60             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
61     static final QName ID =
62             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
63     static final QName ITEM =
64             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
65     private static final QName NUMBER =
66             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number".intern());
67
68     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
69     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
70
71     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
72     private final List<ListenableFuture<Void>> futures = new ArrayList<>();
73     private final Set<Integer> usedValues = new HashSet<>();
74     private final SplittableRandom random = new SplittableRandom();
75
76     private final DOMDataTreeService domDataTreeService;
77     private final long runtimeNanos;
78     private final long delayNanos;
79     private final String id;
80
81     private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
82     private Stopwatch stopwatch;
83
84     private long allTx = 0;
85     private long insertTx = 0;
86     private long deleteTx = 0;
87     private ScheduledFuture<?> scheduledFuture;
88     private DOMDataTreeProducer itemProducer;
89     private DOMDataTreeIdentifier idListItem;
90
91     public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
92                                       final ProduceTransactionsInput input) {
93
94         this.domDataTreeService = domDataTreeService;
95
96         runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds());
97         delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond();
98         id = input.getId();
99     }
100
101     @Override
102     public void run() {
103         futures.add(execWrite(futures.size()));
104         maybeFinish();
105     }
106
107     public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
108         completionFuture = settableFuture;
109
110         if (fillInitialList(completionFuture)) {
111             stopwatch = Stopwatch.createStarted();
112             scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS);
113         } else {
114             executor.shutdown();
115         }
116     }
117
118     private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
119         LOG.debug("Filling the item list with initial values.");
120
121         final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id));
122
123         itemProducer = domDataTreeService.createProducer(
124                 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
125
126         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
127         final DOMDataTreeWriteCursor cursor =
128                 tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey));
129
130         final MapNode list = ImmutableNodes.mapNodeBuilder(ITEM).build();
131         cursor.write(list.getIdentifier(), list);
132         cursor.close();
133
134         idListItem = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
135             idListWithKey.node(list.getIdentifier()).toOptimized());
136
137         try {
138             tx.submit().checkedGet(125, TimeUnit.SECONDS);
139             return true;
140         } catch (final Exception e) {
141             LOG.warn("Unable to fill the initial item list.", e);
142             settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
143                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
144         }
145
146         try {
147             itemProducer.close();
148         } catch (final DOMDataTreeProducerException exception) {
149             LOG.warn("Failure while closing producer.", exception);
150         }
151         return false;
152     }
153
154     private ListenableFuture<Void> execWrite(final int offset) {
155         final int i = random.nextInt(MAX_ITEM + 1);
156         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
157         final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem);
158
159         allTx++;
160
161         final NodeIdentifierWithPredicates entryId = new NodeIdentifierWithPredicates(ITEM, NUMBER, i);
162         if (usedValues.contains(i)) {
163             LOG.debug("Deleting item: {}", i);
164             deleteTx++;
165             cursor.delete(entryId);
166             usedValues.remove(i);
167
168         } else {
169             LOG.debug("Inserting item: {}", i);
170             insertTx++;
171
172             final MapEntryNode entry = ImmutableNodes.mapEntryBuilder().withNodeIdentifier(entryId)
173                     .withChild(ImmutableNodes.leafNode(NUMBER, i)).build();
174             cursor.write(entryId, entry);
175             usedValues.add(i);
176         }
177
178         cursor.close();
179
180         final ListenableFuture<Void> future = tx.submit();
181         if (LOG.isDebugEnabled()) {
182             Futures.addCallback(future, new FutureCallback<Void>() {
183                 @Override
184                 public void onSuccess(final Void result) {
185                     LOG.debug("Future #{} completed successfully", offset);
186                 }
187
188                 @Override
189                 public void onFailure(final Throwable cause) {
190                     LOG.debug("Future #{} failed", offset, cause);
191                 }
192             });
193         }
194
195         return future;
196     }
197
198     private void maybeFinish() {
199         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
200         if (elapsed >= runtimeNanos) {
201             LOG.debug("Reached max running time, waiting for futures to complete.");
202             scheduledFuture.cancel(false);
203
204             final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
205
206             try {
207                 // Timeout from cds should be 2 minutes so leave some leeway.
208                 allFutures.get(125, TimeUnit.SECONDS);
209
210                 LOG.debug("All futures completed successfully.");
211
212                 final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
213                         .setAllTx(allTx)
214                         .setInsertTx(insertTx)
215                         .setDeleteTx(deleteTx)
216                         .build();
217                 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
218                         .withResult(output).build());
219             } catch (ExecutionException e) {
220                 LOG.error("Write transactions failed.", e.getCause());
221                 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
222                         .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
223             } catch (InterruptedException | TimeoutException e) {
224                 LOG.error("Write transactions failed.", e);
225                 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
226                         .withError(RpcError.ErrorType.APPLICATION,
227                                 "Final submit was timed out by the test provider or was interrupted", e).build());
228
229                 for (int i = 0; i < futures.size(); i++) {
230                     final ListenableFuture<Void> future = futures.get(i);
231
232                     try {
233                         future.get(0, TimeUnit.NANOSECONDS);
234                     } catch (TimeoutException fe) {
235                         LOG.warn("Future #{}/{} not completed yet", i, futures.size());
236                     } catch (ExecutionException fe) {
237                         LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
238                     } catch (InterruptedException fe) {
239                         LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
240                     }
241                 }
242             } catch (Exception e) {
243                 LOG.error("Write transactions failed.", e);
244                 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
245                         .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
246             }
247
248             executor.shutdown();
249             try {
250                 itemProducer.close();
251             } catch (final DOMDataTreeProducerException e) {
252                 LOG.warn("Failure while closing item producer.", e);
253             }
254         }
255     }
256 }