2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider.impl;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.ArrayList;
16 import java.util.HashSet;
17 import java.util.LinkedHashSet;
18 import java.util.List;
20 import java.util.SplittableRandom;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
35 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
38 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutputBuilder;
39 import org.opendaylight.yangtools.yang.common.QName;
40 import org.opendaylight.yangtools.yang.common.RpcError;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
50 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 public class WriteTransactionsHandler implements Runnable {
56 private static final Logger LOG = LoggerFactory.getLogger(WriteTransactionsHandler.class);
57 private static final int SECOND_AS_NANO = 1000000000;
58 //2^20 as in the model
59 private static final int MAX_ITEM = 1048576;
61 private static final QName ID_INTS =
62 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
63 private static final QName ID_INT =
64 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int");
65 private static final QName ID =
66 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
67 private static final QName ITEM =
68 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
69 private static final QName NUMBER =
70 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
72 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
73 public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT);
75 private final DOMDataBroker domDataBroker;
76 private final Long timeToTake;
77 private final Long delay;
78 private final String id;
79 private final WriteTransactionsInput input;
81 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
82 private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
83 private final Set<Integer> usedValues = new HashSet<>();
85 private RandomnessProvider random;
86 private TxProvider txProvider;
88 private long startTime;
89 private SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture;
91 private long allTx = 0;
92 private long insertTx = 0;
93 private long deleteTx = 0;
94 private ScheduledFuture<?> scheduledFuture;
95 private YangInstanceIdentifier idListWithKey;
97 public WriteTransactionsHandler(final DOMDataBroker domDataBroker, final WriteTransactionsInput input) {
98 this.domDataBroker = domDataBroker;
101 timeToTake = input.getSeconds() * SECOND_AS_NANO;
102 delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
108 final long current = System.nanoTime();
110 futures.add(execWrite());
112 maybeFinish(current);
115 public void start(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
116 LOG.debug("Starting write-transactions.");
118 if (input.isChainedTransactions()) {
119 txProvider = new TxChainBackedProvider(domDataBroker, settableFuture, executor);
120 random = new BasicProvider();
122 txProvider = new DataBrokerBackedProvider(domDataBroker);
123 random = new NonConflictingProvider();
126 if (ensureListExists(settableFuture) && fillInitialList(settableFuture)) {
127 startTime = System.nanoTime();
128 completionFuture = settableFuture;
129 scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
135 private boolean ensureListExists(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
137 final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
138 .withNodeIdentifier(new NodeIdentifier(ID_INTS))
139 .withChild(ImmutableNodes.mapNodeBuilder(ID_INT).build())
142 DOMDataWriteTransaction tx = txProvider.createTransaction();
143 // write only the top list
144 tx.merge(LogicalDatastoreType.CONFIGURATION, ID_INTS_YID, containerNode);
146 tx.submit().checkedGet(125, TimeUnit.SECONDS);
147 } catch (final OptimisticLockFailedException e) {
148 // when multiple write-transactions are executed concurrently we need to ignore this.
149 // If we get optimistic lock here it means id-ints already exists and we can continue.
150 LOG.debug("Got an optimistic lock when writing initial top level list element.", e);
151 } catch (final TransactionCommitFailedException | TimeoutException e) {
152 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
153 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
154 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
158 final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INT, ID, id)
159 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
162 idListWithKey = ID_INT_YID.node(entry.getIdentifier());
163 tx = txProvider.createTransaction();
164 tx.merge(LogicalDatastoreType.CONFIGURATION, idListWithKey, entry);
167 tx.submit().checkedGet(125, TimeUnit.SECONDS);
168 } catch (final Exception e) {
169 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
170 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
171 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
178 private boolean fillInitialList(final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture) {
179 LOG.debug("Filling the item list with initial values.");
181 final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
183 final YangInstanceIdentifier itemListId = idListWithKey.node(ITEM);
184 final DOMDataWriteTransaction tx = txProvider.createTransaction();
185 tx.put(LogicalDatastoreType.CONFIGURATION, itemListId, mapBuilder.build());
188 tx.submit().checkedGet(125, TimeUnit.SECONDS);
189 } catch (final Exception e) {
190 LOG.warn("Unable to fill the initial item list.", e);
191 settableFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
192 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
199 private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
200 final int i = random.nextInt(MAX_ITEM + 1);
202 final YangInstanceIdentifier entryId =
203 idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
205 final DOMDataWriteTransaction tx = txProvider.createTransaction();
208 if (usedValues.contains(i)) {
209 LOG.debug("Deleting item: {}", i);
211 tx.delete(LogicalDatastoreType.CONFIGURATION, entryId);
212 usedValues.remove(i);
215 LOG.debug("Inserting item: {}", i);
217 final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
218 tx.put(LogicalDatastoreType.CONFIGURATION, entryId, entry);
225 private void maybeFinish(final long current) {
226 if ((current - startTime) > timeToTake) {
227 LOG.debug("Reached max running time, waiting for futures to complete.");
228 scheduledFuture.cancel(false);
230 final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
233 // Timeout from cds should be 2 minutes so leave some leeway.
234 allFutures.get(125, TimeUnit.SECONDS);
236 LOG.debug("All futures completed successfully.");
238 final WriteTransactionsOutput output = new WriteTransactionsOutputBuilder()
240 .setInsertTx(insertTx)
241 .setDeleteTx(deleteTx)
244 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>success()
245 .withResult(output).build());
248 } catch (Exception exception) {
249 LOG.error("Write transactions failed.", exception);
250 completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
251 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
258 private interface RandomnessProvider {
259 int nextInt(int bound);
262 private static class NonConflictingProvider implements RandomnessProvider {
264 private final SplittableRandom random = new SplittableRandom();
265 private final LinkedHashSet<Integer> previousNumbers = new LinkedHashSet<>();
268 public int nextInt(int bound) {
271 nextInt = random.nextInt(bound);
272 } while (previousNumbers.contains(nextInt));
274 if (previousNumbers.size() > 100000) {
275 previousNumbers.iterator().remove();
277 previousNumbers.add(nextInt);
283 private static class BasicProvider implements RandomnessProvider {
285 private final SplittableRandom random = new SplittableRandom();
288 public int nextInt(int bound) {
289 return random.nextInt(bound);
293 private interface TxProvider {
295 DOMDataWriteTransaction createTransaction();
298 private static class TxChainBackedProvider implements TxProvider {
300 private final DOMTransactionChain transactionChain;
302 TxChainBackedProvider(final DOMDataBroker dataBroker,
303 final SettableFuture<RpcResult<WriteTransactionsOutput>> completionFuture,
304 final ScheduledExecutorService executor) {
307 dataBroker.createTransactionChain(new TestChainListener(completionFuture, executor));
311 public DOMDataWriteTransaction createTransaction() {
312 return transactionChain.newWriteOnlyTransaction();
316 private static class DataBrokerBackedProvider implements TxProvider {
318 private final DOMDataBroker dataBroker;
320 DataBrokerBackedProvider(final DOMDataBroker dataBroker) {
321 this.dataBroker = dataBroker;
325 public DOMDataWriteTransaction createTransaction() {
326 return dataBroker.newWriteOnlyTransaction();
330 private static class TestChainListener implements TransactionChainListener {
332 private final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture;
333 private final ScheduledExecutorService executor;
335 TestChainListener(final SettableFuture<RpcResult<WriteTransactionsOutput>> resultFuture,
336 final ScheduledExecutorService executor) {
338 this.resultFuture = resultFuture;
339 this.executor = executor;
343 public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
344 final AsyncTransaction<?, ?> transaction,
345 final Throwable cause) {
346 LOG.warn("Transaction chain failed.", cause);
347 resultFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
348 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
354 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
355 LOG.debug("Transaction chain closed successfully.");