2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider.impl;
11 import com.google.common.base.Stopwatch;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
21 import java.util.SplittableRandom;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
35 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 public class ProduceTransactionsHandler implements Runnable {
52 private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
53 private static final int SECOND_AS_NANO = 1000000000;
54 //2^20 as in the model
55 private static final int MAX_ITEM = 1048576;
57 static final QName ID_INTS =
58 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
59 public static final QName ID_INT =
60 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
61 static final QName ID =
62 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
63 static final QName ITEM =
64 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
65 private static final QName NUMBER =
66 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number".intern());
68 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
69 public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
71 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
72 private final List<ListenableFuture<Void>> futures = new ArrayList<>();
73 private final Set<Integer> usedValues = new HashSet<>();
74 private final SplittableRandom random = new SplittableRandom();
76 private final DOMDataTreeService domDataTreeService;
77 private final long runtimeNanos;
78 private final long delayNanos;
79 private final String id;
81 private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
82 private Stopwatch stopwatch;
84 private long allTx = 0;
85 private long insertTx = 0;
86 private long deleteTx = 0;
87 private ScheduledFuture<?> scheduledFuture;
88 private DOMDataTreeProducer itemProducer;
89 private DOMDataTreeIdentifier idListItem;
91 public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
92 final ProduceTransactionsInput input) {
94 this.domDataTreeService = domDataTreeService;
96 runtimeNanos = TimeUnit.SECONDS.toNanos(input.getSeconds());
97 delayNanos = SECOND_AS_NANO / input.getTransactionsPerSecond();
103 futures.add(execWrite(futures.size()));
107 public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
108 completionFuture = settableFuture;
110 if (fillInitialList(completionFuture)) {
111 stopwatch = Stopwatch.createStarted();
112 scheduledFuture = executor.scheduleAtFixedRate(this, 0, delayNanos, TimeUnit.NANOSECONDS);
118 private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
119 LOG.debug("Filling the item list with initial values.");
121 final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id));
123 itemProducer = domDataTreeService.createProducer(
124 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
126 final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
127 final DOMDataTreeWriteCursor cursor =
128 tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey));
130 final MapNode list = ImmutableNodes.mapNodeBuilder(ITEM).build();
131 cursor.write(list.getIdentifier(), list);
134 idListItem = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
135 idListWithKey.node(list.getIdentifier()).toOptimized());
138 tx.submit().checkedGet(125, TimeUnit.SECONDS);
140 } catch (final Exception e) {
141 LOG.warn("Unable to fill the initial item list.", e);
142 settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
143 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
147 itemProducer.close();
148 } catch (final DOMDataTreeProducerException exception) {
149 LOG.warn("Failure while closing producer.", exception);
154 private ListenableFuture<Void> execWrite(final int offset) {
155 final int i = random.nextInt(MAX_ITEM + 1);
156 final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
157 final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem);
161 final NodeIdentifierWithPredicates entryId = new NodeIdentifierWithPredicates(ITEM, NUMBER, i);
162 if (usedValues.contains(i)) {
163 LOG.debug("Deleting item: {}", i);
165 cursor.delete(entryId);
166 usedValues.remove(i);
169 LOG.debug("Inserting item: {}", i);
172 final MapEntryNode entry = ImmutableNodes.mapEntryBuilder().withNodeIdentifier(entryId)
173 .withChild(ImmutableNodes.leafNode(NUMBER, i)).build();
174 cursor.write(entryId, entry);
180 final ListenableFuture<Void> future = tx.submit();
181 if (LOG.isDebugEnabled()) {
182 Futures.addCallback(future, new FutureCallback<Void>() {
184 public void onSuccess(final Void result) {
185 LOG.debug("Future #{} completed successfully", offset);
189 public void onFailure(final Throwable cause) {
190 LOG.debug("Future #{} failed", offset, cause);
198 private void maybeFinish() {
199 final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
200 if (elapsed >= runtimeNanos) {
201 LOG.debug("Reached max running time, waiting for futures to complete.");
202 scheduledFuture.cancel(false);
204 final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
207 // Timeout from cds should be 2 minutes so leave some leeway.
208 allFutures.get(125, TimeUnit.SECONDS);
210 LOG.debug("All futures completed successfully.");
212 final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
214 .setInsertTx(insertTx)
215 .setDeleteTx(deleteTx)
217 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
218 .withResult(output).build());
219 } catch (ExecutionException e) {
220 LOG.error("Write transactions failed.", e.getCause());
221 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
222 .withError(RpcError.ErrorType.APPLICATION, "Submit failed", e.getCause()).build());
223 } catch (InterruptedException | TimeoutException e) {
224 LOG.error("Write transactions failed.", e);
225 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
226 .withError(RpcError.ErrorType.APPLICATION,
227 "Final submit was timed out by the test provider or was interrupted", e).build());
229 for (int i = 0; i < futures.size(); i++) {
230 final ListenableFuture<Void> future = futures.get(i);
233 future.get(0, TimeUnit.NANOSECONDS);
234 } catch (TimeoutException fe) {
235 LOG.warn("Future #{}/{} not completed yet", i, futures.size());
236 } catch (ExecutionException fe) {
237 LOG.warn("Future #{}/{} failed", i, futures.size(), e.getCause());
238 } catch (InterruptedException fe) {
239 LOG.warn("Interrupted while examining future #{}/{}", i, futures.size(), e);
242 } catch (Exception e) {
243 LOG.error("Write transactions failed.", e);
244 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
245 .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
250 itemProducer.close();
251 } catch (final DOMDataTreeProducerException e) {
252 LOG.warn("Failure while closing item producer.", e);