2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider.impl;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.ArrayList;
16 import java.util.HashSet;
17 import java.util.LinkedHashSet;
18 import java.util.List;
20 import java.util.SplittableRandom;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
38 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
39 import org.opendaylight.yangtools.yang.common.QName;
40 import org.opendaylight.yangtools.yang.common.RpcError;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
50 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 public class WriteTransactionsHandler implements Runnable {
56 private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
57 private static final int SECOND_AS_NANO = 1000000000;
58 //2^20 as in the model
59 private static final int MAX_ITEM = 1048576;
61 private static final QName ID_INTS =
62 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
63 private static final QName ID_INT =
64 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int");
65 private static final QName ID =
66 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
67 private static final QName ITEM =
68 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
69 private static final QName NUMBER =
70 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
72 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
73 public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT);
75 private final DOMDataBroker domDataBroker;
76 private final Long timeToTake;
77 private final Long delay;
78 private final String id;
79 private final WriteTransactionsInput input;
81 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
82 private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
83 private final Set<Integer> usedValues = new HashSet<>();
85 private RandomnessProvider random;
86 private TxProvider txProvider;
88 private long startTime;
89 private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
91 private long allTx = 0;
92 private long insertTx = 0;
93 private long deleteTx = 0;
94 private ScheduledFuture<?> scheduledFuture;
95 private YangInstanceIdentifier idListWithKey;
97 public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
98 this.domDataBroker = domDataBroker;
101 timeToTake = input.getSeconds() * SECOND_AS_NANO;
102 delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
108 final long current = System.nanoTime();
110 futures.add(execWrite());
112 maybeFinish(current);
115 public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
116 LOG.debug("Starting write-transactions.");
118 if (input.isChainedTransactions()) {
119 txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
120 random = new BasicProvider();
122 txProvider = new DataBrokerBackedProvider(domDataBroker);
123 random = new NonConflictingProvider();
126 if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
127 startTime = System.nanoTime();
128 completionFuture = settableFuture;
129 scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
135 private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
137 final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
138 .withNodeIdentifier(new NodeIdentifier(ID_INTS))
139 .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build())
142 DOMDataWriteTransaction tx = txProvider.createTransaction();
143 // write only the top list
144 tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode);
146 tx.submit().checkedGet(125, TimeUnit.SECONDS);
147 } catch (final OptimisticLockFailedException e) {
148 // when multiple write-transactions are executed concurrently we need to ignore this.
149 // If we get optimistic lock here it means id-ints already exists and we can continue.
150 LOG.debug("Got an optimistic lock when writing initial top level list element.", e);
151 } catch (final TransactionCommitFailedException | TimeoutException e) {
152 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
153 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
154 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
158 final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
159 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
162 idListWithKey = ID_INT_YID.node(entry.getIdentifier());
163 tx = txProvider.createTransaction();
164 tx.merge(LogicalDatastoreType.CONFIGURATION, idListWithKey, entry);
167 tx.submit().checkedGet(125, TimeUnit.SECONDS);
168 } catch (final TransactionCommitFailedException | TimeoutException e) {
169 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
170 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
171 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
178 private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
179 LOG.debug("Filling the item list with initial values.");
181 final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
182 for (int i = 0; i < MAX_ITEM / 2; i++) {
184 mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
187 final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM);
188 final DOMDataWriteTransaction tx = txProvider.createTransaction();
189 tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
192 tx.submit().checkedGet(125, TimeUnit.SECONDS);
193 } catch (final TransactionCommitFailedException | TimeoutException e) {
194 LOG.warn("Unable to fill the initial item list.", e);
195 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
196 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
203 private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
204 final int i = random.nextInt(MAX_ITEM + 1);
206 final YangInstanceIdentifier entryId =
207 idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
209 final DOMDataWriteTransaction tx = txProvider.createTransaction();
212 if (usedValues.contains(i)) {
213 LOG.debug("Deleting item: {}", i);
215 tx.delete(LogicalDatastoreType.CONFIGURATION, entryId);
216 usedValues.remove(i);
219 LOG.debug("Inserting item: {}", i);
221 final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
222 tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry);
229 private void maybeFinish(final long current) {
230 if ((current - startTime) > timeToTake) {
231 LOG.debug("Reached max running time, waiting for futures to complete.");
232 scheduledFuture.cancel(false);
234 final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
237 // Timeout from cds should be 2 minutes so leave some leeway.
238 allFutures.get(125, TimeUnit.SECONDS);
240 LOG.debug("All futures completed successfully.");
242 final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
244 .setInsertTx(insertTx)
245 .setDeleteTx(deleteTx)
248 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
249 .withResult(output).build());
252 } catch (InterruptedException | ExecutionException | TimeoutException exception) {
253 LOG.error("Write transactions failed.", exception);
254 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
255 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
262 private interface RandomnessProvider {
263 int nextInt(int bound);
266 private static class NonConflictingProvider implements RandomnessProvider {
268 private final SplittableRandom random = new SplittableRandom();
269 private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
272 public int nextInt(int bound) {
275 nextInt = random.nextInt(bound);
276 } while (previousNumbers.contains(nextInt));
278 if (previousNumbers.size() > 100000) {
279 previousNumbers.iterator().remove();
281 previousNumbers.add(nextInt);
287 private static class BasicProvider implements RandomnessProvider {
289 private final SplittableRandom random = new SplittableRandom();
292 public int nextInt(int bound) {
293 return random.nextInt(bound);
297 private interface TxProvider {
299 DOMDataWriteTransaction createTransaction();
302 private static class TxChainBackedProvider implements TxProvider {
304 private final DOMTransactionChain transactionChain;
306 TxChainBackedProvider(final DOMDataBroker dataBroker,
307 final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
308 final ScheduledExecutorService executor) {
311 dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
315 public DOMDataWriteTransaction createTransaction() {
316 return transactionChain.newWriteOnlyTransaction();
320 private static class DataBrokerBackedProvider implements TxProvider {
322 private final DOMDataBroker dataBroker;
324 DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
325 this.dataBroker = dataBroker;
329 public DOMDataWriteTransaction createTransaction() {
330 return dataBroker.newWriteOnlyTransaction();
334 private static class TestChainListener implements TransactionChainListener {
336 private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
337 private final ScheduledExecutorService executor;
339 TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
340 final ScheduledExecutorService executor) {
342 this.resultFuture = resultFuture;
343 this.executor = executor;
347 public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
348 final AsyncTransaction<?, ?> transaction,
349 final Throwable cause) {
350 LOG.warn("Transaction chain failed.", cause);
351 resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
352 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
358 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
359 LOG.debug("Transaction chain closed successfully.");