Remove use of SchemaContext.getQName()
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / AbstractTransactionHandler.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider.impl;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FluentFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.HashSet;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.atomic.AtomicLong;
23 import org.opendaylight.mdsal.common.api.CommitInfo;
24 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 abstract class AbstractTransactionHandler {
31     private enum State {
32         RUNNING,
33         WAITING,
34         SUCCESSFUL,
35         FAILED,
36     }
37
38     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
39
40     static final int SECOND_AS_NANO = 1_000_000_000;
41     //2^20 as in the model
42     static final int MAX_ITEM = 1048576;
43
44     static final QName ID_INTS =
45             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
46     static final QName ID =
47             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
48     static final QName ITEM =
49             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
50     static final QName NUMBER =
51             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
52
53     public static final QName ID_INT =
54             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
55     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
56     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
57
58     static final long INIT_TX_TIMEOUT_SECONDS = 125;
59
60     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
61
62     /*
63      * writingExecutor is a single thread executor. Only this thread will write to datastore,
64      * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
65      * This thread only adds to futures set.
66      */
67     private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
68     /*
69      * completingExecutor is a single thread executor. Only this thread writes to State.
70      * This thread should never incur any sleep penalty, so RPC response should always come on time.
71      * This thread only removes from futures set.
72      */
73     private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
74     private final Collection<ListenableFuture<?>> futures = Collections.synchronizedSet(new HashSet<>());
75     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
76     private final long runtimeNanos;
77     private final long delayNanos;
78
79     private ScheduledFuture<?> writingFuture;
80     private ScheduledFuture<?> completingFuture;
81     private final AtomicLong txCounter = new AtomicLong();
82     private volatile State state;
83
84     AbstractTransactionHandler(final TransactionsParams params) {
85         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds().toJava());
86         delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond().toJava();
87     }
88
89     final synchronized void doStart() {
90         // Setup state first...
91         stopwatch.start();
92         state = State.RUNNING;
93
94         writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
95     }
96
97     private void execute() {
98         // Single volatile access
99         final State local = state;
100
101         switch (local) {
102             case FAILED:
103                 // This could happen due to scheduling artifacts
104                 break;
105             case RUNNING:
106                 runningExecute();
107                 break;
108             default:
109                 throw new IllegalStateException("Unhandled state " + local);
110         }
111     }
112
113     private void runningExecute() {
114         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
115         if (elapsed >= runtimeNanos) {
116             LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
117             completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
118             return;
119         }
120
121         // Not completed yet: create a transaction and hook it up
122         final long txId = txCounter.incrementAndGet();
123         final FluentFuture<? extends CommitInfo> execFuture = execWrite(txId);
124         LOG.debug("New future #{} allocated", txId);
125
126         // Ordering is important: we need to add the future before hooking the callback
127         futures.add(execFuture);
128         execFuture.addCallback(new FutureCallback<CommitInfo>() {
129             @Override
130             public void onSuccess(final CommitInfo result) {
131                 txSuccess(execFuture, txId);
132             }
133
134             @Override
135             public void onFailure(final Throwable cause) {
136                 txFailure(execFuture, txId, cause);
137             }
138         }, completingExecutor);
139     }
140
141     private void runtimeUp() {
142         // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
143         completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
144         if (!checkSuccessful()) {
145             state = State.WAITING;
146             writingFuture.cancel(false);
147         }
148     }
149
150     private boolean checkSuccessful() {
151         if (futures.isEmpty()) {
152             LOG.debug("Completed waiting for all futures");
153             state = State.SUCCESSFUL;
154             completingFuture.cancel(false);
155             runSuccessful(txCounter.get());
156             return true;
157         }
158
159         return false;
160     }
161
162     final void txSuccess(final ListenableFuture<?> execFuture, final long txId) {
163         LOG.debug("Future #{} completed successfully", txId);
164         futures.remove(execFuture);
165
166         final State local = state;
167         switch (local) {
168             case FAILED:
169             case RUNNING:
170                 // No-op
171                 break;
172             case WAITING:
173                 checkSuccessful();
174                 break;
175             default:
176                 throw new IllegalStateException("Unhandled state " + local);
177         }
178     }
179
180     final void txFailure(final ListenableFuture<?> execFuture, final long txId, final Throwable cause) {
181         LOG.error("Commit future failed for tx # {}", txId, cause);
182         futures.remove(execFuture);
183
184         final State local = state;
185         switch (local) {
186             case FAILED:
187                 // no-op
188                 break;
189             case RUNNING:
190             case WAITING:
191                 state = State.FAILED;
192                 writingFuture.cancel(false);
193                 runFailed(cause, txId);
194                 break;
195             default:
196                 throw new IllegalStateException("Unhandled state " + local);
197         }
198     }
199
200     private void checkComplete() {
201         final int size = futures.size();
202         if (size == 0) {
203             return;
204         }
205
206         // Guards iteration against concurrent modification from callbacks
207         synchronized (futures) {
208             int offset = 0;
209
210             for (ListenableFuture<?> future : futures) {
211                 try {
212                     future.get(0, TimeUnit.NANOSECONDS);
213                 } catch (final TimeoutException e) {
214                     LOG.warn("Future #{}/{} not completed yet", offset, size);
215                 } catch (final ExecutionException e) {
216                     LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
217                 } catch (final InterruptedException e) {
218                     LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
219                 }
220
221                 ++offset;
222             }
223         }
224
225         state = State.FAILED;
226         runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds");
227     }
228
229     abstract FluentFuture<? extends CommitInfo> execWrite(long txId);
230
231     abstract void runFailed(Throwable cause, long txId);
232
233     abstract void runSuccessful(long allTx);
234
235     abstract void runTimedOut(String cause);
236 }