BUG 8462: Switch to using cds-client in usubscribe-ddtl
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / ProduceTransactionsHandler.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider.impl;
10
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Set;
20 import java.util.SplittableRandom;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
28 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
35 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
36 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
37 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.RpcError;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public class ProduceTransactionsHandler implements Runnable {
52
53     private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
54     private static final int SECOND_AS_NANO = 1000000000;
55     //2^20 as in the model
56     private static final int MAX_ITEM = 1048576;
57
58     static final QName ID_INTS =
59             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints");
60     public static final QName ID_INT =
61             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int");
62     static final QName ID =
63             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id");
64     static final QName ITEM =
65             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item");
66     private static final QName NUMBER =
67             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number");
68
69     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
70     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT);
71
72     private final DOMDataTreeService domDataTreeService;
73
74     private final long timeToTake;
75     private final long delay;
76     private final String id;
77
78     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
79     private final ArrayList<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
80     private final Set<Integer> usedValues = new HashSet<>();
81     private final SplittableRandom random = new SplittableRandom();
82
83     private long startTime;
84     private SettableFuture<RpcResult<ProduceTransactionsOutput>> completionFuture;
85
86     private long allTx = 0;
87     private long insertTx = 0;
88     private long deleteTx = 0;
89     private ScheduledFuture<?> scheduledFuture;
90     private DOMDataTreeProducer itemProducer;
91     private YangInstanceIdentifier idListWithKey;
92
93     public ProduceTransactionsHandler(final DOMDataTreeService domDataTreeService,
94                                       final ProduceTransactionsInput input) {
95
96         this.domDataTreeService = domDataTreeService;
97
98         timeToTake = input.getSeconds() * SECOND_AS_NANO;
99         delay = SECOND_AS_NANO / input.getTransactionsPerSecond();
100         id = input.getId();
101     }
102
103     @Override
104     public void run() {
105         final long current = System.nanoTime();
106
107         futures.add(execWrite());
108
109         maybeFinish(current);
110     }
111
112     public void start(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
113         completionFuture = settableFuture;
114
115         if (fillInitialList(completionFuture)) {
116             startTime = System.nanoTime();
117             scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
118         } else {
119             executor.shutdown();
120         }
121     }
122
123     private boolean fillInitialList(final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture) {
124         LOG.debug("Filling the item list with initial values.");
125
126         final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ITEM);
127         for (int i = 0; i < MAX_ITEM / 2; i++) {
128             usedValues.add(i);
129             mapBuilder.withChild(ImmutableNodes.mapEntry(ITEM, NUMBER, i));
130         }
131
132         idListWithKey = ID_INT_YID.node(new NodeIdentifierWithPredicates(ID_INT, ID, id));
133
134         itemProducer = domDataTreeService.createProducer(
135                 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
136
137         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
138         final DOMDataTreeWriteCursor cursor =
139                 tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey));
140
141         final MapNode list = mapBuilder.build();
142         cursor.write(list.getIdentifier(), list);
143         cursor.close();
144
145         try {
146             tx.submit().checkedGet(125, TimeUnit.SECONDS);
147         } catch (final TransactionCommitFailedException | TimeoutException e) {
148             LOG.warn("Unable to fill the initial item list.", e);
149             settableFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
150                     .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
151
152             try {
153                 itemProducer.close();
154             } catch (final DOMDataTreeProducerException exception) {
155                 LOG.warn("Failure while closing producer.", exception);
156             }
157             return false;
158         }
159
160         return true;
161     }
162
163     private CheckedFuture<Void, TransactionCommitFailedException> execWrite() {
164         final int i = random.nextInt(MAX_ITEM + 1);
165
166         final YangInstanceIdentifier entryId =
167                 idListWithKey.node(ITEM).node(new NodeIdentifierWithPredicates(ITEM, NUMBER, i));
168
169         final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
170         final DOMDataTreeWriteCursor cursor = tx.createCursor(
171                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(ITEM)));
172         allTx++;
173
174         if (usedValues.contains(i)) {
175             LOG.debug("Deleting item: {}", i);
176             deleteTx++;
177             cursor.delete(entryId.getLastPathArgument());
178             usedValues.remove(i);
179
180         } else {
181             LOG.debug("Inserting item: {}", i);
182             insertTx++;
183             final MapEntryNode entry = ImmutableNodes.mapEntry(ITEM, NUMBER, i);
184             cursor.write(entryId.getLastPathArgument(), entry);
185             usedValues.add(i);
186         }
187
188         cursor.close();
189
190         return tx.submit();
191     }
192
193     private void maybeFinish(final long current) {
194         if ((current - startTime) > timeToTake) {
195             LOG.debug("Reached max running time, waiting for futures to complete.");
196             scheduledFuture.cancel(false);
197
198             final ListenableFuture<List<Void>> allFutures = Futures.allAsList(futures);
199
200             try {
201                 // Timeout from cds should be 2 minutes so leave some leeway.
202                 allFutures.get(125, TimeUnit.SECONDS);
203
204                 LOG.debug("All futures completed successfully.");
205
206                 final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
207                         .setAllTx(allTx)
208                         .setInsertTx(insertTx)
209                         .setDeleteTx(deleteTx)
210                         .build();
211
212
213                 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
214                         .withResult(output).build());
215
216                 executor.shutdown();
217             } catch (InterruptedException | ExecutionException | TimeoutException exception) {
218                 LOG.error("Write transactions failed.", exception);
219                 completionFuture.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
220                         .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", exception).build());
221
222                 executor.shutdown();
223             } finally {
224                 try {
225                     itemProducer.close();
226                 } catch (final DOMDataTreeProducerException e) {
227                     LOG.warn("Failure while closing item producer.", e);
228                 }
229             }
230         }
231     }
232 }