2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider.impl;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.HashSet;
18 import java.util.LinkedHashSet;
19 import java.util.List;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
32 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
34 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
35 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 public class WriteTransactionsHandler implements Runnable {
52 private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
53 private static final int SECOND_AS_NANO = 1000000000;
54 //2^20 as in the model
55 private static final int MAX_ITEM = 1048576;
57 private static final QName ID_INTS =
58 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
59 private static final QName ID =
60 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
61 private static final QName ITEM =
62 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
63 private static final QName NUMBER =
64 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
66 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
68 private final DOMDataBroker domDataBroker;
69 private final Long timeToTake;
70 private final Long delay;
71 private final String id;
72 private final WriteTransactionsInput input;
74 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
75 private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
76 private final Set<Integer> usedValues = new HashSet<>();
78 private RandomnessProvider random;
79 private TxProvider txProvider;
81 private long startTime;
82 private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
84 private long allTx = 0;
85 private long insertTx = 0;
86 private long deleteTx = 0;
87 private ScheduledFuture<?> scheduledFuture;
88 private YangInstanceIdentifier idListWithKey;
90 public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
91 this.domDataBroker = domDataBroker;
94 timeToTake = input.getSeconds() * SECOND_AS_NANO;
95 delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
101 final long current = System.nanoTime();
103 futures.add(execWrite());
105 maybeFinish(current);
108 public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
109 LOG.debug("Starting write-transactions.");
111 if (input.isChainedTransactions()) {
112 txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
113 random = new BasicProvider();
115 txProvider = new DataBrokerBackedProvider(domDataBroker);
116 random = new NonConflictingProvider();
119 if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
120 startTime = System.nanoTime();
121 completionFuture = settableFuture;
122 scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
128 private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
130 final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id)
131 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
133 final MapNode mapNode =
134 ImmutableNodes.mapNodeBuilder(ID_INTS)
138 final DOMDataWriteTransaction tx = txProvider.createTransaction();
139 idListWithKey = ID_INTS_YID.node(entry.getIdentifier());
140 tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, mapNode);
143 tx.submit().checkedGet();
144 } catch (final TransactionCommitFailedException e) {
145 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
146 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
147 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
154 private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
155 LOG.debug("Filling the item list with initial values.");
157 final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
158 for (int i = 0; i < MAX_ITEM / 2; i++) {
160 mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
163 final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM);
164 final DOMDataWriteTransaction tx = txProvider.createTransaction();
165 tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
168 tx.submit().checkedGet();
169 } catch (final TransactionCommitFailedException e) {
170 LOG.warn("Unable to fill the initial item list.", e);
171 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
172 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
179 private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
180 final int i = random.nextInt(MAX_ITEM + 1);
182 final YangInstanceIdentifier entryId =
183 idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
185 final DOMDataWriteTransaction tx = txProvider.createTransaction();
188 if (usedValues.contains(i)) {
189 LOG.debug("Deleting item: {}", i);
191 tx.delete(LogicalDatastoreType.CONFIGURATION, entryId);
192 usedValues.remove(i);
195 LOG.debug("Inserting item: {}", i);
197 final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
198 tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry);
205 private void maybeFinish(final long current) {
206 if ((current - startTime) > timeToTake) {
207 LOG.debug("Reached max running time, waiting for futures to complete.");
208 scheduledFuture.cancel(false);
210 final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
212 Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
214 public void onSuccess(@Nullable final List<Void> result) {
215 LOG.debug("All futures completed successfully.");
217 final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
219 .setInsertTx(insertTx)
220 .setDeleteTx(deleteTx)
223 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
224 .withResult(output).build());
230 public void onFailure(final Throwable t) {
231 LOG.error("Write transactions failed.", t);
232 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
233 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());
241 private interface RandomnessProvider {
242 int nextInt(int bound);
245 private static class NonConflictingProvider implements RandomnessProvider {
247 private final SplittableRandom random = new SplittableRandom();
248 private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
251 public int nextInt(int bound) {
254 nextInt = random.nextInt(bound);
255 } while (previousNumbers.contains(nextInt));
257 if (previousNumbers.size() > 100000) {
258 previousNumbers.iterator().remove();
260 previousNumbers.add(nextInt);
266 private static class BasicProvider implements RandomnessProvider {
268 private final SplittableRandom random = new SplittableRandom();
271 public int nextInt(int bound) {
272 return random.nextInt(bound);
276 private interface TxProvider {
278 DOMDataWriteTransaction createTransaction();
281 private static class TxChainBackedProvider implements TxProvider {
283 private final DOMTransactionChain transactionChain;
285 TxChainBackedProvider(final DOMDataBroker dataBroker,
286 final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
287 final ScheduledExecutorService executor) {
290 dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
294 public DOMDataWriteTransaction createTransaction() {
295 return transactionChain.newWriteOnlyTransaction();
299 private static class DataBrokerBackedProvider implements TxProvider {
301 private final DOMDataBroker dataBroker;
303 DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
304 this.dataBroker = dataBroker;
308 public DOMDataWriteTransaction createTransaction() {
309 return dataBroker.newWriteOnlyTransaction();
313 private static class TestChainListener implements TransactionChainListener {
315 private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
316 private final ScheduledExecutorService executor;
318 TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
319 final ScheduledExecutorService executor) {
321 this.resultFuture = resultFuture;
322 this.executor = executor;
326 public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
327 final AsyncTransaction<?, ?> transaction,
328 final Throwable cause) {
329 LOG.warn("Transaction chain failed.", cause);
330 resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
331 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
337 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
338 LOG.debug("Transaction chain closed successfully.");