2 * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider.impl;
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FluentFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.ScheduledThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.concurrent.atomic.AtomicLong;
25 import org.opendaylight.mdsal.common.api.CommitInfo;
26 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
27 import org.opendaylight.yangtools.yang.common.QName;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 abstract class AbstractTransactionHandler {
40 private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
42 static final int SECOND_AS_NANO = 1_000_000_000;
43 //2^20 as in the model
44 static final int MAX_ITEM = 1048576;
46 static final QName ID_INTS =
47 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
48 static final QName ID =
49 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
50 static final QName ITEM =
51 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
52 static final QName NUMBER =
53 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
55 public static final QName ID_INT =
56 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
57 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
58 public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
60 static final long INIT_TX_TIMEOUT_SECONDS = 125;
62 private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
63 private static final AtomicLong COUNTER = new AtomicLong();
66 * writingExecutor is a single thread executor. Only this thread will write to datastore,
67 * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
68 * This thread only adds to futures set.
70 private final ScheduledExecutorService writingExecutor = newExecutorService("writing");
72 * completingExecutor is a single thread executor. Only this thread writes to State.
73 * This thread should never incur any sleep penalty, so RPC response should always come on time.
74 * This thread only removes from futures set.
76 private final ScheduledExecutorService completingExecutor = newExecutorService("completing");
77 private final Collection<ListenableFuture<?>> futures = Collections.synchronizedSet(new HashSet<>());
78 private final Stopwatch stopwatch = Stopwatch.createUnstarted();
79 private final long runtimeNanos;
80 private final long delayNanos;
82 private ScheduledFuture<?> writingFuture;
83 private ScheduledFuture<?> completingFuture;
84 private final AtomicLong txCounter = new AtomicLong();
85 private volatile State state;
87 AbstractTransactionHandler(final TransactionsParams params) {
88 runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds().toJava());
89 delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond().toJava();
92 final synchronized void doStart() {
93 // Setup state first...
95 state = State.RUNNING;
97 writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
100 private void execute() {
101 // Single volatile access
102 final State local = state;
106 // This could happen due to scheduling artifacts
112 throw new IllegalStateException("Unhandled state " + local);
116 private void runningExecute() {
117 final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
118 if (elapsed >= runtimeNanos) {
119 LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
120 completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
124 // Not completed yet: create a transaction and hook it up
125 final long txId = txCounter.incrementAndGet();
126 final FluentFuture<? extends CommitInfo> execFuture = execWrite(txId);
127 LOG.debug("New future #{} allocated", txId);
129 // Ordering is important: we need to add the future before hooking the callback
130 futures.add(execFuture);
131 execFuture.addCallback(new FutureCallback<CommitInfo>() {
133 public void onSuccess(final CommitInfo result) {
134 txSuccess(execFuture, txId);
138 public void onFailure(final Throwable cause) {
139 txFailure(execFuture, txId, cause);
141 }, completingExecutor);
144 private void runtimeUp() {
145 // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
146 completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
147 if (!checkSuccessful()) {
148 state = State.WAITING;
149 writingFuture.cancel(false);
153 private boolean checkSuccessful() {
154 if (futures.isEmpty()) {
155 LOG.debug("Completed waiting for all futures");
156 state = State.SUCCESSFUL;
157 completingFuture.cancel(false);
158 runSuccessful(txCounter.get());
166 final void txSuccess(final ListenableFuture<?> execFuture, final long txId) {
167 LOG.debug("Future #{} completed successfully", txId);
168 futures.remove(execFuture);
170 final State local = state;
180 throw new IllegalStateException("Unhandled state " + local);
184 final void txFailure(final ListenableFuture<?> execFuture, final long txId, final Throwable cause) {
185 LOG.error("Commit future failed for tx # {}", txId, cause);
186 futures.remove(execFuture);
188 final State local = state;
195 state = State.FAILED;
196 writingFuture.cancel(false);
197 runFailed(cause, txId);
201 throw new IllegalStateException("Unhandled state " + local);
205 private void checkComplete() {
206 final int size = futures.size();
211 // Guards iteration against concurrent modification from callbacks
212 synchronized (futures) {
215 for (ListenableFuture<?> future : futures) {
217 future.get(0, TimeUnit.NANOSECONDS);
218 } catch (final TimeoutException e) {
219 LOG.warn("Future #{}/{} not completed yet", offset, size);
220 } catch (final ExecutionException e) {
221 LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
222 } catch (final InterruptedException e) {
223 LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
230 state = State.FAILED;
231 runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds");
235 private void shutdownExecutors() {
236 writingExecutor.shutdown();
237 completingExecutor.shutdown();
240 abstract FluentFuture<? extends CommitInfo> execWrite(long txId);
242 abstract void runFailed(Throwable cause, long txId);
244 abstract void runSuccessful(long allTx);
246 abstract void runTimedOut(String cause);
248 private ScheduledExecutorService newExecutorService(final String kind) {
249 final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
251 .setNameFormat(getClass().getSimpleName() + "-" + kind + "-" + COUNTER.getAndIncrement() + "%d")
253 executor.setKeepAliveTime(15, TimeUnit.SECONDS);
254 executor.allowCoreThreadTimeOut(true);