2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider.impl;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nullable;
27 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
28 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
35 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 public class ProduceTransactionsHandler implements Runnable {
52 private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
53 private static final int SECOND_AS_NANO = 1000000000;
54 //2^20 as in the model
55 private static final int MAX_ITEM = 1048576;
57 private static final QName ID_INTS =
58 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
59 private static final QName ID =
60 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
61 private static final QName ITEM =
62 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
63 private static final QName NUMBER =
64 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
66 public static final YangInstanceIdentifier ID_INTS_YID =
67 YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(ID_INTS));
69 private final DOMDataTreeService domDataTreeService;
71 private final long timeToTake;
72 private final long delay;
73 private final String id;
75 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
76 private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
77 private final Set<Integer> usedValues = new HashSet<>();
78 private final SplittableRandom random = new SplittableRandom();
80 private long startTime;
81 private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
83 private long allTx = 0;
84 private long insertTx = 0;
85 private long deleteTx = 0;
86 private ScheduledFuture<?> scheduledFuture;
87 private YangInstanceIdentifier idListWithKey;
88 private DOMDataTreeProducer itemProducer;
90 public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
91 final ProduceTransactionsInput input) {
93 this.domDataTreeService = domDataTreeService;
95 timeToTake = input.getSeconds() * SECOND_AS_NANO;
96 delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
102 final long current = System.nanoTime();
104 futures.add(execWrite());
106 maybeFinish(current);
109 public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
111 if (ensureListExists(completionFuture) && fillInitialList(completionFuture)) {
112 startTime = System.nanoTime();
113 completionFuture = settableFuture;
114 scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
120 private boolean ensureListExists(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
122 final MapEntryNode entry = ImmutableNodes.mapEntryBuilder(ID_INTS, ID, id)
123 .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
125 final MapNode mapNode =
126 ImmutableNodes.mapNodeBuilder(ID_INTS)
130 final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton(
131 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY)));
133 final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false);
135 final DOMDataTreeWriteCursor cursor =
136 tx.createCursor(new DOMDataTreeIdentifier(
137 LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY));
139 idListWithKey = ID_INTS_YID.node(entry.getIdentifier());
141 cursor.merge(mapNode.getIdentifier(), mapNode);
145 tx.submit().checkedGet();
146 } catch (TransactionCommitFailedException e) {
147 LOG.warn("Unable to ensure IdInts list for id: {} exists.", id, e);
148 settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
149 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
154 } catch (DOMDataTreeProducerException e) {
155 LOG.warn("Error while closing producer.", e);
162 private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
163 LOG.debug("Filling the item list with initial values.");
165 final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
166 for (int i = 0; i < MAX_ITEM / 2; i++) {
168 mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
171 itemProducer = domDataTreeService.createProducer(
172 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
174 final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
175 final DOMDataTreeWriteCursor cursor =
176 tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey));
178 final MapNode list = mapBuilder.build();
179 cursor.write(list.getIdentifier(), list);
183 tx.submit().checkedGet();
184 } catch (final TransactionCommitFailedException e) {
185 LOG.warn("Unable to fill the initial item list.", e);
186 settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
187 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
194 private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
195 final int i = random.nextInt(MAX_ITEM + 1);
197 final YangInstanceIdentifier entryId =
198 idListWithKey.node(ITEM).node(new YangInstanceIdentifier.NodeIdentifierWithPredicates(ITEM, NUMBER, i));
200 final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
201 final DOMDataTreeWriteCursor cursor = tx.createCursor(
202 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM)));
205 if (usedValues.contains(i)) {
206 LOG.debug("Deleting item: {}", i);
208 cursor.delete(entryId.getLastPathArgument());
209 usedValues.remove(i);
212 LOG.debug("Inserting item: {}", i);
214 final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
215 cursor.write(entryId.getLastPathArgument(), entry);
224 private void maybeFinish(final long current) {
225 if ((current - startTime) > timeToTake) {
226 LOG.debug("Reached max running time, waiting for futures to complete.");
227 scheduledFuture.cancel(false);
229 final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
231 Futures.addCallback(allFutures, new FutureCallback<List<Void>>() {
233 public void onSuccess(@Nullable final List<Void> result) {
234 LOG.debug("All futures completed successfully.");
236 final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
238 .setInsertTx(insertTx)
239 .setDeleteTx(deleteTx)
242 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
243 .withResult(output).build());
249 public void onFailure(final Throwable t) {
250 LOG.error("Write transactions failed.", t);
251 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
252 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", t).build());