917f6857694bc780b827f0273ddffd8907f7bbe5
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / ProduceTransactionsHandler.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider.impl;
10
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
28 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
35 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
50 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 public class ProduceTransactionsHandler implements Runnable {
55
56     private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
57     private static final int SECOND_AS_NANO = 1000000000;
58     //2^20 as in the model
59     private static final int MAX_ITEM = 1048576;
60
61     private static final QName ID_INTS =
62             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
63     private static final QName ID_INT =
64             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int");
65     private static final QName ID =
66             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
67     private static final QName ITEM =
68             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
69     private static final QName NUMBER =
70             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
71
72     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
73     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT);
74
75     private final DOMDataTreeService domDataTreeService;
76
77     private final long timeToTake;
78     private final long delay;
79     private final String id;
80
81     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
82     private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
83     private final Set<Integer> usedValues = new HashSet<>();
84     private final SplittableRandom random = new SplittableRandom();
85
86     private long startTime;
87     private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
88
89     private long allTx = 0;
90     private long insertTx = 0;
91     private long deleteTx = 0;
92     private ScheduledFuture<?> scheduledFuture;
93     private YangInstanceIdentifier idListWithKey;
94     private DOMDataTreeProducer itemProducer;
95
96     public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
97                                       final ProduceTransactionsInput input) {
98
99         this.domDataTreeService = domDataTreeService;
100
101         timeToTake = input.getSeconds() * SECOND_AS_NANO;
102         delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
103         id = input.getId();
104     }
105
106     @Override
107     public void run() {
108         final long current = System.nanoTime();
109
110         futures.add(execWrite());
111
112         maybeFinish(current);
113     }
114
115     public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
116         completionFuture = settableFuture;
117
118         if (ensureListExists(completionFuture) && fillInitialList(completionFuture)) {
119             startTime = System.nanoTime();
120             scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
121         } else {
122             executor.shutdown();
123         }
124     }
125
126     private boolean ensureListExists(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
127
128         final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
129                 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
130                 .build();
131         final MapNode mapNode =
132                 ImmutableNodes.mapNodeBuilder(ID_INT)
133                         .withChild(entry)
134                         .build();
135
136         final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
137                 .withNodeIdentifier(new NodeIdentifier(ID_INTS))
138                 .withChild(mapNode)
139                 .build();
140
141         final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton(
142                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY)));
143
144         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false);
145
146         final DOMDataTreeWriteCursor cursor =
147                 tx.createCursor(new DOMDataTreeIdentifier(
148                         LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY));
149
150         idListWithKey = ID_INT_YID.node(entry.getIdentifier());
151
152         cursor.merge(containerNode.getIdentifier(), containerNode);
153         cursor.close();
154
155         try {
156             tx.submit().checkedGet();
157         } catch (TransactionCommitFailedException e) {
158             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
159             settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
160                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
161             return false;
162         } finally {
163             try {
164                 producer.close();
165             } catch (DOMDataTreeProducerException e) {
166                 LOG.warn("Error while closing producer.", e);
167             }
168         }
169
170         return true;
171     }
172
173     private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
174         LOG.debug("Filling the item list with initial values.");
175
176         final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
177         for (int i = 0; i < MAX_ITEM / 2; i++) {
178             usedValues.add(i);
179             mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
180         }
181
182         itemProducer = domDataTreeService.createProducer(
183                 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
184
185         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
186         final DOMDataTreeWriteCursor cursor =
187                 tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey));
188
189         final MapNode list = mapBuilder.build();
190         cursor.write(list.getIdentifier(), list);
191         cursor.close();
192
193         try {
194             tx.submit().checkedGet();
195         } catch (final TransactionCommitFailedException e) {
196             LOG.warn("Unable to fill the initial item list.", e);
197             settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
198                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
199             return false;
200         }
201
202         return true;
203     }
204
205     private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
206         final int i = random.nextInt(MAX_ITEM + 1);
207
208         final YangInstanceIdentifier entryId =
209                 idListWithKey.node(ITEM).node(new NodeIdentifierWithPredicates(ITEM, NUMBER, i));
210
211         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
212         final DOMDataTreeWriteCursor cursor = tx.createCursor(
213                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM)));
214         allTx++;
215
216         if (usedValues.contains(i)) {
217             LOG.debug("Deleting item: {}", i);
218             deleteTx++;
219             cursor.delete(entryId.getLastPathArgument());
220             usedValues.remove(i);
221
222         } else {
223             LOG.debug("Inserting item: {}", i);
224             insertTx++;
225             final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
226             cursor.write(entryId.getLastPathArgument(), entry);
227             usedValues.add(i);
228         }
229
230         cursor.close();
231
232         return tx.submit();
233     }
234
235     private void maybeFinish(final long current) {
236         if ((current - startTime) > timeToTake) {
237             LOG.debug("Reached max running time, waiting for futures to complete.");
238             scheduledFuture.cancel(false);
239
240             final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
241
242             Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
243                 @Override
244                 public void onSuccess(@Nullable final List<Void> result) {
245                     LOG.debug("All futures completed successfully.");
246
247                     final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
248                             .setAllTx(allTx)
249                             .setInsertTx(insertTx)
250                             .setDeleteTx(deleteTx)
251                             .build();
252
253                     completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
254                             .withResult(output).build());
255
256                     executor.shutdown();
257                 }
258
259                 @Override
260                 public void onFailure(final Throwable t) {
261                     LOG.error("Write transactions failed.", t);
262                     completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
263                             .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());
264
265                     executor.shutdown();
266                 }
267             });
268         }
269     }
270 }