2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider.impl;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.List;
20 import java.util.SplittableRandom;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
28 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
35 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 public class ProduceTransactionsHandler implements Runnable {
53 private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
54 private static final int SECOND_AS_NANO = 1000000000;
55 //2^20 as in the model
56 private static final int MAX_ITEM = 1048576;
58 static final QName ID_INTS =
59 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
60 public static final QName ID_INT =
61 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int");
62 static final QName ID =
63 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
64 static final QName ITEM =
65 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
66 private static final QName NUMBER =
67 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
69 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
70 public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT);
72 private final DOMDataTreeService domDataTreeService;
74 private final long timeToTake;
75 private final long delay;
76 private final String id;
78 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
79 private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
80 private final Set<Integer> usedValues = new HashSet<>();
81 private final SplittableRandom random = new SplittableRandom();
83 private long startTime;
84 private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
86 private long allTx = 0;
87 private long insertTx = 0;
88 private long deleteTx = 0;
89 private ScheduledFuture<?> scheduledFuture;
90 private DOMDataTreeProducer itemProducer;
91 private YangInstanceIdentifier idListWithKey;
93 public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
94 final ProduceTransactionsInput input) {
96 this.domDataTreeService = domDataTreeService;
98 timeToTake = input.getSeconds() * SECOND_AS_NANO;
99 delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
105 final long current = System.nanoTime();
107 futures.add(execWrite());
109 maybeFinish(current);
112 public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
113 completionFuture = settableFuture;
115 if (fillInitialList(completionFuture)) {
116 startTime = System.nanoTime();
117 scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
123 private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
124 LOG.debug("Filling the item list with initial values.");
126 final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
127 idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id));
129 itemProducer = domDataTreeService.createProducer(
130 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
132 final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
133 final DOMDataTreeWriteCursor cursor =
134 tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey));
136 final MapNode list = mapBuilder.build();
137 cursor.write(list.getIdentifier(), list);
141 tx.submit().checkedGet(125, TimeUnit.SECONDS);
142 } catch (final Exception e) {
143 LOG.warn("Unable to fill the initial item list.", e);
144 settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
145 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
148 itemProducer.close();
149 } catch (final DOMDataTreeProducerException exception) {
150 LOG.warn("Failure while closing producer.", exception);
158 private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
159 final int i = random.nextInt(MAX_ITEM + 1);
161 final YangInstanceIdentifier entryId =
162 idListWithKey.node(ITEM).node(new NodeIdentifierWithPredicates(ITEM, NUMBER, i));
164 final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
165 final DOMDataTreeWriteCursor cursor = tx.createCursor(
166 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM)));
169 if (usedValues.contains(i)) {
170 LOG.debug("Deleting item: {}", i);
172 cursor.delete(entryId.getLastPathArgument());
173 usedValues.remove(i);
176 LOG.debug("Inserting item: {}", i);
178 final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
179 cursor.write(entryId.getLastPathArgument(), entry);
188 private void maybeFinish(final long current) {
189 if ((current - startTime) > timeToTake) {
190 LOG.debug("Reached max running time, waiting for futures to complete.");
191 scheduledFuture.cancel(false);
193 final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
196 // Timeout from cds should be 2 minutes so leave some leeway.
197 allFutures.get(125, TimeUnit.SECONDS);
199 LOG.debug("All futures completed successfully.");
201 final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
203 .setInsertTx(insertTx)
204 .setDeleteTx(deleteTx)
208 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
209 .withResult(output).build());
212 } catch (Exception exception) {
213 LOG.error("Write transactions failed.", exception);
214 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
215 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
217 for (int i = 0; i < futures.size(); i++) {
218 final CheckedFuture<Void, TransactionCommitFailedException> future = futures.get(i);
219 if (!future.isDone()) {
220 LOG.warn("Future #{}/{} possibly hanged.", future, futures.size());
227 itemProducer.close();
228 } catch (final DOMDataTreeProducerException e) {
229 LOG.warn("Failure while closing item producer.", e);