2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider.impl;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.HashSet;
18 import java.util.LinkedHashSet;
19 import java.util.List;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
38 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
39 import org.opendaylight.yangtools.yang.common.QName;
40 import org.opendaylight.yangtools.yang.common.RpcError;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
50 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
51 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 public class WriteTransactionsHandler implements Runnable {
57 private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
58 private static final int SECOND_AS_NANO = 1000000000;
59 //2^20 as in the model
60 private static final int MAX_ITEM = 1048576;
62 private static final QName ID_INTS =
63 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
64 private static final QName ID_INT =
65 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int");
66 private static final QName ID =
67 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
68 private static final QName ITEM =
69 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
70 private static final QName NUMBER =
71 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
73 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
74 public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT);
76 private final DOMDataBroker domDataBroker;
77 private final Long timeToTake;
78 private final Long delay;
79 private final String id;
80 private final WriteTransactionsInput input;
82 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
83 private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
84 private final Set<Integer> usedValues = new HashSet<>();
86 private RandomnessProvider random;
87 private TxProvider txProvider;
89 private long startTime;
90 private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
92 private long allTx = 0;
93 private long insertTx = 0;
94 private long deleteTx = 0;
95 private ScheduledFuture<?> scheduledFuture;
96 private YangInstanceIdentifier idListWithKey;
98 public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
99 this.domDataBroker = domDataBroker;
102 timeToTake = input.getSeconds() * SECOND_AS_NANO;
103 delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
109 final long current = System.nanoTime();
111 futures.add(execWrite());
113 maybeFinish(current);
116 public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
117 LOG.debug("Starting write-transactions.");
119 if (input.isChainedTransactions()) {
120 txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
121 random = new BasicProvider();
123 txProvider = new DataBrokerBackedProvider(domDataBroker);
124 random = new NonConflictingProvider();
127 if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
128 startTime = System.nanoTime();
129 completionFuture = settableFuture;
130 scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
136 private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
138 final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
139 .withNodeIdentifier(new NodeIdentifier(ID_INTS))
140 .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build())
143 DOMDataWriteTransaction tx = txProvider.createTransaction();
144 // write only the top list
145 tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode);
147 tx.submit().checkedGet();
148 } catch (final OptimisticLockFailedException e) {
149 // when multiple write-transactions are executed concurrently we need to ignore this.
150 // If we get optimistic lock here it means id-ints already exists and we can continue.
151 LOG.debug("Got an optimistic lock when writing initial top level list element.", e);
152 } catch (final TransactionCommitFailedException e) {
153 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
154 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
155 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
159 final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
160 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
163 idListWithKey = ID_INT_YID.node(entry.getIdentifier());
164 tx = txProvider.createTransaction();
165 tx.merge(LogicalDatastoreType.CONFIGURATION, idListWithKey, entry);
168 tx.submit().checkedGet();
169 } catch (final TransactionCommitFailedException e) {
170 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
171 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
172 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
179 private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
180 LOG.debug("Filling the item list with initial values.");
182 final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
183 for (int i = 0; i < MAX_ITEM / 2; i++) {
185 mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
188 final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM);
189 final DOMDataWriteTransaction tx = txProvider.createTransaction();
190 tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
193 tx.submit().checkedGet();
194 } catch (final TransactionCommitFailedException e) {
195 LOG.warn("Unable to fill the initial item list.", e);
196 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
197 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
204 private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
205 final int i = random.nextInt(MAX_ITEM + 1);
207 final YangInstanceIdentifier entryId =
208 idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
210 final DOMDataWriteTransaction tx = txProvider.createTransaction();
213 if (usedValues.contains(i)) {
214 LOG.debug("Deleting item: {}", i);
216 tx.delete(LogicalDatastoreType.CONFIGURATION, entryId);
217 usedValues.remove(i);
220 LOG.debug("Inserting item: {}", i);
222 final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
223 tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry);
230 private void maybeFinish(final long current) {
231 if ((current - startTime) > timeToTake) {
232 LOG.debug("Reached max running time, waiting for futures to complete.");
233 scheduledFuture.cancel(false);
235 final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
237 Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
239 public void onSuccess(@Nullable final List<Void> result) {
240 LOG.debug("All futures completed successfully.");
242 final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
244 .setInsertTx(insertTx)
245 .setDeleteTx(deleteTx)
248 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
249 .withResult(output).build());
255 public void onFailure(final Throwable t) {
256 LOG.error("Write transactions failed.", t);
257 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
258 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());
266 private interface RandomnessProvider {
267 int nextInt(int bound);
270 private static class NonConflictingProvider implements RandomnessProvider {
272 private final SplittableRandom random = new SplittableRandom();
273 private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
276 public int nextInt(int bound) {
279 nextInt = random.nextInt(bound);
280 } while (previousNumbers.contains(nextInt));
282 if (previousNumbers.size() > 100000) {
283 previousNumbers.iterator().remove();
285 previousNumbers.add(nextInt);
291 private static class BasicProvider implements RandomnessProvider {
293 private final SplittableRandom random = new SplittableRandom();
296 public int nextInt(int bound) {
297 return random.nextInt(bound);
301 private interface TxProvider {
303 DOMDataWriteTransaction createTransaction();
306 private static class TxChainBackedProvider implements TxProvider {
308 private final DOMTransactionChain transactionChain;
310 TxChainBackedProvider(final DOMDataBroker dataBroker,
311 final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
312 final ScheduledExecutorService executor) {
315 dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
319 public DOMDataWriteTransaction createTransaction() {
320 return transactionChain.newWriteOnlyTransaction();
324 private static class DataBrokerBackedProvider implements TxProvider {
326 private final DOMDataBroker dataBroker;
328 DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
329 this.dataBroker = dataBroker;
333 public DOMDataWriteTransaction createTransaction() {
334 return dataBroker.newWriteOnlyTransaction();
338 private static class TestChainListener implements TransactionChainListener {
340 private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
341 private final ScheduledExecutorService executor;
343 TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
344 final ScheduledExecutorService executor) {
346 this.resultFuture = resultFuture;
347 this.executor = executor;
351 public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
352 final AsyncTransaction<?, ?> transaction,
353 final Throwable cause) {
354 LOG.warn("Transaction chain failed.", cause);
355 resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
356 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
362 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
363 LOG.debug("Transaction chain closed successfully.");