ee46a74746c56c52451fa091746a9ed1e1b566af
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / ProduceTransactionsHandler.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider.impl;
10
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Set;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
28 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
35 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class ProduceTransactionsHandler implements Runnable {
51
52     private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
53     private static final int SECOND_AS_NANO = 1000000000;
54     //2^20 as in the model
55     private static final int MAX_ITEM = 1048576;
56
57     private static final QName ID_INTS =
58             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
59     private static final QName ID =
60             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
61     private static final QName ITEM =
62             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
63     private static final QName NUMBER =
64             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
65
66     public static final YangInstanceIdentifier ID_INTS_YID =
67             YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(ID_INTS));
68
69     private final DOMDataTreeService domDataTreeService;
70
71     private final long timeToTake;
72     private final long delay;
73     private final String id;
74
75     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
76     private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
77     private final Set<Integer> usedValues = new HashSet<>();
78     private final SplittableRandom random = new SplittableRandom();
79
80     private long startTime;
81     private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
82
83     private long allTx = 0;
84     private long insertTx = 0;
85     private long deleteTx = 0;
86     private ScheduledFuture<?> scheduledFuture;
87     private YangInstanceIdentifier idListWithKey;
88     private DOMDataTreeProducer itemProducer;
89
90     public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
91                                       final ProduceTransactionsInput input) {
92
93         this.domDataTreeService = domDataTreeService;
94
95         timeToTake = input.getSeconds() * SECOND_AS_NANO;
96         delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
97         id = input.getId();
98     }
99
100     @Override
101     public void run() {
102         final long current = System.nanoTime();
103
104         futures.add(execWrite());
105
106         maybeFinish(current);
107     }
108
109     public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
110
111         if (ensureListExists(completionFuture) && fillInitialList(completionFuture)) {
112             startTime = System.nanoTime();
113             completionFuture = settableFuture;
114             scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
115         } else {
116             executor.shutdown();
117         }
118     }
119
120     private boolean ensureListExists(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
121
122         final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id)
123                 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
124                 .build();
125         final MapNode mapNode =
126                 ImmutableNodes.mapNodeBuilder(ID_INTS)
127                         .withChild(entry)
128                         .build();
129
130         final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton(
131                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY)));
132
133         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false);
134
135         final DOMDataTreeWriteCursor cursor =
136                 tx.createCursor(new DOMDataTreeIdentifier(
137                         LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY));
138
139         idListWithKey = ID_INTS_YID.node(entry.getIdentifier());
140
141         cursor.merge(mapNode.getIdentifier(), mapNode);
142         cursor.close();
143
144         try {
145             tx.submit().checkedGet();
146         } catch (TransactionCommitFailedException e) {
147             LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
148             settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
149                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
150             return false;
151         } finally {
152             try {
153                 producer.close();
154             } catch (DOMDataTreeProducerException e) {
155                 LOG.warn("Error while closing producer.", e);
156             }
157         }
158
159         return true;
160     }
161
162     private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
163         LOG.debug("Filling the item list with initial values.");
164
165         final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
166         for (int i = 0; i < MAX_ITEM / 2; i++) {
167             usedValues.add(i);
168             mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
169         }
170
171         itemProducer = domDataTreeService.createProducer(
172                 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
173
174         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
175         final DOMDataTreeWriteCursor cursor =
176                 tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey));
177
178         final MapNode list = mapBuilder.build();
179         cursor.write(list.getIdentifier(), list);
180         cursor.close();
181
182         try {
183             tx.submit().checkedGet();
184         } catch (final TransactionCommitFailedException e) {
185             LOG.warn("Unable to fill the initial item list.", e);
186             settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
187                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
188             return false;
189         }
190
191         return true;
192     }
193
194     private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
195         final int i = random.nextInt(MAX_ITEM + 1);
196
197         final YangInstanceIdentifier entryId =
198                 idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
199
200         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
201         final DOMDataTreeWriteCursor cursor = tx.createCursor(
202                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM)));
203         allTx++;
204
205         if (usedValues.contains(i)) {
206             LOG.debug("Deleting item: {}", i);
207             deleteTx++;
208             cursor.delete(entryId.getLastPathArgument());
209             usedValues.remove(i);
210
211         } else {
212             LOG.debug("Inserting item: {}", i);
213             insertTx++;
214             final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
215             cursor.write(entryId.getLastPathArgument(), entry);
216             usedValues.add(i);
217         }
218
219         cursor.close();
220
221         return tx.submit();
222     }
223
224     private void maybeFinish(final long current) {
225         if ((current - startTime) > timeToTake) {
226             LOG.debug("Reached max running time, waiting for futures to complete.");
227             scheduledFuture.cancel(false);
228
229             final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
230
231             Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
232                 @Override
233                 public void onSuccess(@Nullable final List<Void> result) {
234                     LOG.debug("All futures completed successfully.");
235
236                     final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
237                             .setAllTx(allTx)
238                             .setInsertTx(insertTx)
239                             .setDeleteTx(deleteTx)
240                             .build();
241
242                     completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
243                             .withResult(output).build());
244
245                     executor.shutdown();
246                 }
247
248                 @Override
249                 public void onFailure(final Throwable t) {
250                     LOG.error("Write transactions failed.", t);
251                     completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
252                             .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());
253
254                     executor.shutdown();
255                 }
256             });
257         }
258     }
259 }