Bug 8494: Separate writing and completion threads
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / AbstractTransactionHandler.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider.impl;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.HashSet;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 abstract class AbstractTransactionHandler {
29     private enum State {
30         RUNNING,
31         WAITING,
32         SUCCESSFUL,
33         FAILED,
34     }
35
36     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
37
38     static final int SECOND_AS_NANO = 1_000_000_000;
39     //2^20 as in the model
40     static final int MAX_ITEM = 1048576;
41
42     static final QName ID_INTS =
43             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
44     static final QName ID =
45             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
46     static final QName ITEM =
47             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
48     static final QName NUMBER =
49             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
50
51     public static final QName ID_INT =
52             QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
53     public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
54     public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
55
56     static final long INIT_TX_TIMEOUT_SECONDS = 125;
57
58     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
59
60     /*
61      * writingExecutor is a single thread executor. Only this thread will write to datastore,
62      * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
63      * This thread only adds to futures set.
64      */
65     private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
66     /*
67      * completingExecutor is a single thread executor. Only this thread writes to State.
68      * This thread should never incur any sleep penalty, so RPC response should always come on time.
69      * This thread only removes from futures set.
70      */
71     private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
72     private final Collection<ListenableFuture<Void>> futures = Collections.synchronizedSet(new HashSet<>());
73     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
74     private final long runtimeNanos;
75     private final long delayNanos;
76
77     private ScheduledFuture<?> writingFuture;
78     private ScheduledFuture<?> completingFuture;
79     private long txCounter;
80     private volatile State state;
81
82     AbstractTransactionHandler(final TransactionsParams params) {
83         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
84         delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
85     }
86
87     final synchronized void doStart() {
88         // Setup state first...
89         stopwatch.start();
90         state = State.RUNNING;
91
92         writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
93     }
94
95     private void execute() {
96         // Single volatile access
97         final State local = state;
98
99         switch (local) {
100             case FAILED:
101                 // This could happen due to scheduling artifacts
102                 break;
103             case RUNNING:
104                 runningExecute();
105                 break;
106             default:
107                 throw new IllegalStateException("Unhandled state " + local);
108         }
109     }
110
111     private void runningExecute() {
112         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
113         if (elapsed >= runtimeNanos) {
114             LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
115             completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
116             return;
117         }
118
119         // Not completed yet: create a transaction and hook it up
120         final long txId = txCounter++;
121         final ListenableFuture<Void> execFuture = execWrite(txId);
122         LOG.debug("New future #{} allocated", txId);
123
124         // Ordering is important: we need to add the future before hooking the callback
125         futures.add(execFuture);
126         Futures.addCallback(execFuture, new FutureCallback<Void>() {
127             @Override
128             public void onSuccess(final Void result) {
129                 txSuccess(execFuture, txId);
130             }
131
132             @Override
133             public void onFailure(final Throwable cause) {
134                 txFailure(execFuture, txId, cause);
135             }
136         }, completingExecutor);
137     }
138
139     private void runtimeUp() {
140         // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
141         completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
142         if (!checkSuccessful()) {
143             state = State.WAITING;
144             writingFuture.cancel(false);
145         }
146     }
147
148     private boolean checkSuccessful() {
149         if (futures.isEmpty()) {
150             LOG.debug("Completed waiting for all futures");
151             state = State.SUCCESSFUL;
152             completingFuture.cancel(false);
153             runSuccessful(txCounter);
154             return true;
155         }
156
157         return false;
158     }
159
160     final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
161         LOG.debug("Future #{} completed successfully", txId);
162         futures.remove(execFuture);
163
164         final State local = state;
165         switch (local) {
166             case FAILED:
167             case RUNNING:
168                 // No-op
169                 break;
170             case WAITING:
171                 checkSuccessful();
172                 break;
173             default:
174                 throw new IllegalStateException("Unhandled state " + local);
175         }
176     }
177
178     final void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
179         LOG.debug("Future #{} failed", txId, cause);
180         futures.remove(execFuture);
181
182         final State local = state;
183         switch (local) {
184             case FAILED:
185                 // no-op
186                 break;
187             case RUNNING:
188             case WAITING:
189                 state = State.FAILED;
190                 writingFuture.cancel(false);
191                 runFailed(cause);
192                 break;
193             default:
194                 throw new IllegalStateException("Unhandled state " + local);
195         }
196     }
197
198     private void checkComplete() {
199         final int size = futures.size();
200         if (size == 0) {
201             return;
202         }
203
204         // Guards iteration against concurrent modification from callbacks
205         synchronized (futures) {
206             int offset = 0;
207
208             for (ListenableFuture<Void> future : futures) {
209                 try {
210                     future.get(0, TimeUnit.NANOSECONDS);
211                 } catch (final TimeoutException e) {
212                     LOG.warn("Future #{}/{} not completed yet", offset, size);
213                 } catch (final ExecutionException e) {
214                     LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
215                 } catch (final InterruptedException e) {
216                     LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
217                 }
218
219                 ++offset;
220             }
221         }
222
223         state = State.FAILED;
224         runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
225     }
226
227     abstract ListenableFuture<Void> execWrite(final long txId);
228
229     abstract void runFailed(Throwable cause);
230
231     abstract void runSuccessful(long allTx);
232
233     abstract void runTimedOut(Exception cause);
234 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.