Bump mdsal to 5.0.2
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / AbstractTransactionHandler.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider.impl;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.HashSet;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.atomic.AtomicLong;
23 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
24 import org.opendaylight.yangtools.yang.common.QName;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 abstract class AbstractTransactionHandler {
30     private enum State {
31         RUNNING,
32         WAITING,
33         SUCCESSFUL,
34         FAILED,
35     }
36
37     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
38
39     static final int SECOND_AS_NANO = 1_000_000_000;
40     //2^20 as in the model
41     static final int MAX_ITEM = 1048576;
42
43     static final QName ID_INTS =
44             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
45     static final QName ID =
46             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
47     static final QName ITEM =
48             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
49     static final QName NUMBER =
50             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
51
52     public static final QName ID_INT =
53             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
54     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
55     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
56
57     static final long INIT_TX_TIMEOUT_SECONDS = 125;
58
59     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
60
61     /*
62      * writingExecutor is a single thread executor. Only this thread will write to datastore,
63      * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
64      * This thread only adds to futures set.
65      */
66     private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
67     /*
68      * completingExecutor is a single thread executor. Only this thread writes to State.
69      * This thread should never incur any sleep penalty, so RPC response should always come on time.
70      * This thread only removes from futures set.
71      */
72     private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
73     private final Collection<ListenableFuture<?>> futures = Collections.synchronizedSet(new HashSet<>());
74     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
75     private final long runtimeNanos;
76     private final long delayNanos;
77
78     private ScheduledFuture<?> writingFuture;
79     private ScheduledFuture<?> completingFuture;
80     private final AtomicLong txCounter = new AtomicLong();
81     private volatile State state;
82
83     AbstractTransactionHandler(final TransactionsParams params) {
84         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds().toJava());
85         delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond().toJava();
86     }
87
88     final synchronized void doStart() {
89         // Setup state first...
90         stopwatch.start();
91         state = State.RUNNING;
92
93         writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
94     }
95
96     private void execute() {
97         // Single volatile access
98         final State local = state;
99
100         switch (local) {
101             case FAILED:
102                 // This could happen due to scheduling artifacts
103                 break;
104             case RUNNING:
105                 runningExecute();
106                 break;
107             default:
108                 throw new IllegalStateException("Unhandled state " + local);
109         }
110     }
111
112     private void runningExecute() {
113         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
114         if (elapsed >= runtimeNanos) {
115             LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
116             completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
117             return;
118         }
119
120         // Not completed yet: create a transaction and hook it up
121         final long txId = txCounter.incrementAndGet();
122         final ListenableFuture<?> execFuture = execWrite(txId);
123         LOG.debug("New future #{} allocated", txId);
124
125         // Ordering is important: we need to add the future before hooking the callback
126         futures.add(execFuture);
127         Futures.addCallback(execFuture, new FutureCallback<Object>() {
128             @Override
129             public void onSuccess(final Object result) {
130                 txSuccess(execFuture, txId);
131             }
132
133             @Override
134             public void onFailure(final Throwable cause) {
135                 txFailure(execFuture, txId, cause);
136             }
137         }, completingExecutor);
138     }
139
140     private void runtimeUp() {
141         // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
142         completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
143         if (!checkSuccessful()) {
144             state = State.WAITING;
145             writingFuture.cancel(false);
146         }
147     }
148
149     private boolean checkSuccessful() {
150         if (futures.isEmpty()) {
151             LOG.debug("Completed waiting for all futures");
152             state = State.SUCCESSFUL;
153             completingFuture.cancel(false);
154             runSuccessful(txCounter.get());
155             return true;
156         }
157
158         return false;
159     }
160
161     final void txSuccess(final ListenableFuture<?> execFuture, final long txId) {
162         LOG.debug("Future #{} completed successfully", txId);
163         futures.remove(execFuture);
164
165         final State local = state;
166         switch (local) {
167             case FAILED:
168             case RUNNING:
169                 // No-op
170                 break;
171             case WAITING:
172                 checkSuccessful();
173                 break;
174             default:
175                 throw new IllegalStateException("Unhandled state " + local);
176         }
177     }
178
179     final void txFailure(final ListenableFuture<?> execFuture, final long txId, final Throwable cause) {
180         LOG.error("Commit future failed for tx # {}", txId, cause);
181         futures.remove(execFuture);
182
183         final State local = state;
184         switch (local) {
185             case FAILED:
186                 // no-op
187                 break;
188             case RUNNING:
189             case WAITING:
190                 state = State.FAILED;
191                 writingFuture.cancel(false);
192                 runFailed(cause, txId);
193                 break;
194             default:
195                 throw new IllegalStateException("Unhandled state " + local);
196         }
197     }
198
199     private void checkComplete() {
200         final int size = futures.size();
201         if (size == 0) {
202             return;
203         }
204
205         // Guards iteration against concurrent modification from callbacks
206         synchronized (futures) {
207             int offset = 0;
208
209             for (ListenableFuture<?> future : futures) {
210                 try {
211                     future.get(0, TimeUnit.NANOSECONDS);
212                 } catch (final TimeoutException e) {
213                     LOG.warn("Future #{}/{} not completed yet", offset, size);
214                 } catch (final ExecutionException e) {
215                     LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
216                 } catch (final InterruptedException e) {
217                     LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
218                 }
219
220                 ++offset;
221             }
222         }
223
224         state = State.FAILED;
225         runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds");
226     }
227
228     abstract ListenableFuture<?> execWrite(long txId);
229
230     abstract void runFailed(Throwable cause, long txId);
231
232     abstract void runSuccessful(long allTx);
233
234     abstract void runTimedOut(String cause);
235 }