2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider.impl;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.HashSet;
18 import java.util.LinkedHashSet;
19 import java.util.List;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
38 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
39 import org.opendaylight.yangtools.yang.common.QName;
40 import org.opendaylight.yangtools.yang.common.RpcError;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 public class WriteTransactionsHandler implements Runnable {
53 private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
54 private static final int SECOND_AS_NANO = 1000000000;
55 //2^20 as in the model
56 private static final int MAX_ITEM = 1048576;
58 private static final QName ID_INTS =
59 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
60 private static final QName ID =
61 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
62 private static final QName ITEM =
63 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
64 private static final QName NUMBER =
65 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
67 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
69 private final DOMDataBroker domDataBroker;
70 private final Long timeToTake;
71 private final Long delay;
72 private final String id;
73 private final WriteTransactionsInput input;
75 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
76 private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
77 private final Set<Integer> usedValues = new HashSet<>();
79 private RandomnessProvider random;
80 private TxProvider txProvider;
82 private long startTime;
83 private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
85 private long allTx = 0;
86 private long insertTx = 0;
87 private long deleteTx = 0;
88 private ScheduledFuture<?> scheduledFuture;
89 private YangInstanceIdentifier idListWithKey;
91 public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
92 this.domDataBroker = domDataBroker;
95 timeToTake = input.getSeconds() * SECOND_AS_NANO;
96 delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
102 final long current = System.nanoTime();
104 futures.add(execWrite());
106 maybeFinish(current);
109 public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
110 LOG.debug("Starting write-transactions.");
112 if (input.isChainedTransactions()) {
113 txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
114 random = new BasicProvider();
116 txProvider = new DataBrokerBackedProvider(domDataBroker);
117 random = new NonConflictingProvider();
120 if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
121 startTime = System.nanoTime();
122 completionFuture = settableFuture;
123 scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
129 private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
131 final MapNode mapNode = ImmutableNodes.mapNodeBuilder(ID_INTS).build();
133 DOMDataWriteTransaction tx = txProvider.createTransaction();
134 // write only the top list
135 tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, mapNode);
137 tx.submit().checkedGet();
138 } catch (final OptimisticLockFailedException e) {
139 // when multiple write-transactions are executed concurrently we need to ignore this.
140 // If we get optimistic lock here it means id-ints already exists and we can continue.
141 LOG.debug("Got an optimistic lock when writing initial top level list element.", e);
142 } catch (final TransactionCommitFailedException e) {
143 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
144 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
145 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
149 final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id)
150 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
153 idListWithKey = ID_INTS_YID.node(entry.getIdentifier());
154 tx = txProvider.createTransaction();
155 tx.merge(LogicalDatastoreType.CONFIGURATION, idListWithKey, entry);
158 tx.submit().checkedGet();
159 } catch (final TransactionCommitFailedException e) {
160 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
161 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
162 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
169 private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
170 LOG.debug("Filling the item list with initial values.");
172 final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
173 for (int i = 0; i < MAX_ITEM / 2; i++) {
175 mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
178 final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM);
179 final DOMDataWriteTransaction tx = txProvider.createTransaction();
180 tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
183 tx.submit().checkedGet();
184 } catch (final TransactionCommitFailedException e) {
185 LOG.warn("Unable to fill the initial item list.", e);
186 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
187 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
194 private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
195 final int i = random.nextInt(MAX_ITEM + 1);
197 final YangInstanceIdentifier entryId =
198 idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
200 final DOMDataWriteTransaction tx = txProvider.createTransaction();
203 if (usedValues.contains(i)) {
204 LOG.debug("Deleting item: {}", i);
206 tx.delete(LogicalDatastoreType.CONFIGURATION, entryId);
207 usedValues.remove(i);
210 LOG.debug("Inserting item: {}", i);
212 final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
213 tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry);
220 private void maybeFinish(final long current) {
221 if ((current - startTime) > timeToTake) {
222 LOG.debug("Reached max running time, waiting for futures to complete.");
223 scheduledFuture.cancel(false);
225 final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
227 Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
229 public void onSuccess(@Nullable final List<Void> result) {
230 LOG.debug("All futures completed successfully.");
232 final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
234 .setInsertTx(insertTx)
235 .setDeleteTx(deleteTx)
238 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
239 .withResult(output).build());
245 public void onFailure(final Throwable t) {
246 LOG.error("Write transactions failed.", t);
247 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
248 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());
256 private interface RandomnessProvider {
257 int nextInt(int bound);
260 private static class NonConflictingProvider implements RandomnessProvider {
262 private final SplittableRandom random = new SplittableRandom();
263 private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
266 public int nextInt(int bound) {
269 nextInt = random.nextInt(bound);
270 } while (previousNumbers.contains(nextInt));
272 if (previousNumbers.size() > 100000) {
273 previousNumbers.iterator().remove();
275 previousNumbers.add(nextInt);
281 private static class BasicProvider implements RandomnessProvider {
283 private final SplittableRandom random = new SplittableRandom();
286 public int nextInt(int bound) {
287 return random.nextInt(bound);
291 private interface TxProvider {
293 DOMDataWriteTransaction createTransaction();
296 private static class TxChainBackedProvider implements TxProvider {
298 private final DOMTransactionChain transactionChain;
300 TxChainBackedProvider(final DOMDataBroker dataBroker,
301 final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
302 final ScheduledExecutorService executor) {
305 dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
309 public DOMDataWriteTransaction createTransaction() {
310 return transactionChain.newWriteOnlyTransaction();
314 private static class DataBrokerBackedProvider implements TxProvider {
316 private final DOMDataBroker dataBroker;
318 DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
319 this.dataBroker = dataBroker;
323 public DOMDataWriteTransaction createTransaction() {
324 return dataBroker.newWriteOnlyTransaction();
328 private static class TestChainListener implements TransactionChainListener {
330 private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
331 private final ScheduledExecutorService executor;
333 TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
334 final ScheduledExecutorService executor) {
336 this.resultFuture = resultFuture;
337 this.executor = executor;
341 public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
342 final AsyncTransaction<?, ?> transaction,
343 final Throwable cause) {
344 LOG.warn("Transaction chain failed.", cause);
345 resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
346 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
352 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
353 LOG.debug("Transaction chain closed successfully.");