Bump versions to 4.0.0-SNAPSHOT
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / AbstractTransactionHandler.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider.impl;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FluentFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.ScheduledThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.concurrent.atomic.AtomicLong;
25 import org.opendaylight.mdsal.common.api.CommitInfo;
26 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
27 import org.opendaylight.yangtools.yang.common.QName;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 abstract class AbstractTransactionHandler {
33     private enum State {
34         RUNNING,
35         WAITING,
36         SUCCESSFUL,
37         FAILED,
38     }
39
40     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
41
42     static final int SECOND_AS_NANO = 1_000_000_000;
43     //2^20 as in the model
44     static final int MAX_ITEM = 1048576;
45
46     static final QName ID_INTS =
47             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
48     static final QName ID =
49             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
50     static final QName ITEM =
51             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
52     static final QName NUMBER =
53             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
54
55     public static final QName ID_INT =
56             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
57     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
58     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
59
60     static final long INIT_TX_TIMEOUT_SECONDS = 125;
61
62     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
63     private static final AtomicLong COUNTER = new AtomicLong();
64
65     /*
66      * writingExecutor is a single thread executor. Only this thread will write to datastore,
67      * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
68      * This thread only adds to futures set.
69      */
70     private final ScheduledExecutorService writingExecutor = newExecutorService("writing");
71     /*
72      * completingExecutor is a single thread executor. Only this thread writes to State.
73      * This thread should never incur any sleep penalty, so RPC response should always come on time.
74      * This thread only removes from futures set.
75      */
76     private final ScheduledExecutorService completingExecutor = newExecutorService("completing");
77     private final Collection<ListenableFuture<?>> futures = Collections.synchronizedSet(new HashSet<>());
78     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
79     private final long runtimeNanos;
80     private final long delayNanos;
81
82     private ScheduledFuture<?> writingFuture;
83     private ScheduledFuture<?> completingFuture;
84     private final AtomicLong txCounter = new AtomicLong();
85     private volatile State state;
86
87     AbstractTransactionHandler(final TransactionsParams params) {
88         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds().toJava());
89         delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond().toJava();
90     }
91
92     final synchronized void doStart() {
93         // Setup state first...
94         stopwatch.start();
95         state = State.RUNNING;
96
97         writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
98     }
99
100     private void execute() {
101         // Single volatile access
102         final State local = state;
103
104         switch (local) {
105             case FAILED:
106                 // This could happen due to scheduling artifacts
107                 break;
108             case RUNNING:
109                 runningExecute();
110                 break;
111             default:
112                 throw new IllegalStateException("Unhandled state " + local);
113         }
114     }
115
116     private void runningExecute() {
117         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
118         if (elapsed >= runtimeNanos) {
119             LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
120             completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
121             return;
122         }
123
124         // Not completed yet: create a transaction and hook it up
125         final long txId = txCounter.incrementAndGet();
126         final FluentFuture<? extends CommitInfo> execFuture = execWrite(txId);
127         LOG.debug("New future #{} allocated", txId);
128
129         // Ordering is important: we need to add the future before hooking the callback
130         futures.add(execFuture);
131         execFuture.addCallback(new FutureCallback<CommitInfo>() {
132             @Override
133             public void onSuccess(final CommitInfo result) {
134                 txSuccess(execFuture, txId);
135             }
136
137             @Override
138             public void onFailure(final Throwable cause) {
139                 txFailure(execFuture, txId, cause);
140             }
141         }, completingExecutor);
142     }
143
144     private void runtimeUp() {
145         // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
146         completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
147         if (!checkSuccessful()) {
148             state = State.WAITING;
149             writingFuture.cancel(false);
150         }
151     }
152
153     private boolean checkSuccessful() {
154         if (futures.isEmpty()) {
155             LOG.debug("Completed waiting for all futures");
156             state = State.SUCCESSFUL;
157             completingFuture.cancel(false);
158             runSuccessful(txCounter.get());
159             shutdownExecutors();
160             return true;
161         }
162
163         return false;
164     }
165
166     final void txSuccess(final ListenableFuture<?> execFuture, final long txId) {
167         LOG.debug("Future #{} completed successfully", txId);
168         futures.remove(execFuture);
169
170         final State local = state;
171         switch (local) {
172             case FAILED:
173             case RUNNING:
174                 // No-op
175                 break;
176             case WAITING:
177                 checkSuccessful();
178                 break;
179             default:
180                 throw new IllegalStateException("Unhandled state " + local);
181         }
182     }
183
184     final void txFailure(final ListenableFuture<?> execFuture, final long txId, final Throwable cause) {
185         LOG.error("Commit future failed for tx # {}", txId, cause);
186         futures.remove(execFuture);
187
188         final State local = state;
189         switch (local) {
190             case FAILED:
191                 // no-op
192                 break;
193             case RUNNING:
194             case WAITING:
195                 state = State.FAILED;
196                 writingFuture.cancel(false);
197                 runFailed(cause, txId);
198                 shutdownExecutors();
199                 break;
200             default:
201                 throw new IllegalStateException("Unhandled state " + local);
202         }
203     }
204
205     private void checkComplete() {
206         final int size = futures.size();
207         if (size == 0) {
208             return;
209         }
210
211         // Guards iteration against concurrent modification from callbacks
212         synchronized (futures) {
213             int offset = 0;
214
215             for (ListenableFuture<?> future : futures) {
216                 try {
217                     future.get(0, TimeUnit.NANOSECONDS);
218                 } catch (final TimeoutException e) {
219                     LOG.warn("Future #{}/{} not completed yet", offset, size);
220                 } catch (final ExecutionException e) {
221                     LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
222                 } catch (final InterruptedException e) {
223                     LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
224                 }
225
226                 ++offset;
227             }
228         }
229
230         state = State.FAILED;
231         runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds");
232         shutdownExecutors();
233     }
234
235     private void shutdownExecutors() {
236         writingExecutor.shutdown();
237         completingExecutor.shutdown();
238     }
239
240     abstract FluentFuture<? extends CommitInfo> execWrite(long txId);
241
242     abstract void runFailed(Throwable cause, long txId);
243
244     abstract void runSuccessful(long allTx);
245
246     abstract void runTimedOut(String cause);
247
248     private ScheduledExecutorService newExecutorService(final String kind) {
249         final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
250             .setDaemon(true)
251             .setNameFormat(getClass().getSimpleName() + "-" + kind + "-" + COUNTER.getAndIncrement() + "%d")
252             .build());
253         executor.setKeepAliveTime(15, TimeUnit.SECONDS);
254         executor.allowCoreThreadTimeOut(true);
255         return executor;
256     }
257 }