2 * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider.impl;
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FluentFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.HashSet;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.atomic.AtomicLong;
23 import org.opendaylight.mdsal.common.api.CommitInfo;
24 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
25 import org.opendaylight.yangtools.yang.common.QName;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 abstract class AbstractTransactionHandler {
38 private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
40 static final int SECOND_AS_NANO = 1_000_000_000;
41 //2^20 as in the model
42 static final int MAX_ITEM = 1048576;
44 static final QName ID_INTS =
45 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
46 static final QName ID =
47 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
48 static final QName ITEM =
49 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
50 static final QName NUMBER =
51 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
53 public static final QName ID_INT =
54 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
55 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
56 public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
58 static final long INIT_TX_TIMEOUT_SECONDS = 125;
60 private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
63 * writingExecutor is a single thread executor. Only this thread will write to datastore,
64 * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
65 * This thread only adds to futures set.
67 private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
69 * completingExecutor is a single thread executor. Only this thread writes to State.
70 * This thread should never incur any sleep penalty, so RPC response should always come on time.
71 * This thread only removes from futures set.
73 private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
74 private final Collection<ListenableFuture<?>> futures = Collections.synchronizedSet(new HashSet<>());
75 private final Stopwatch stopwatch = Stopwatch.createUnstarted();
76 private final long runtimeNanos;
77 private final long delayNanos;
79 private ScheduledFuture<?> writingFuture;
80 private ScheduledFuture<?> completingFuture;
81 private final AtomicLong txCounter = new AtomicLong();
82 private volatile State state;
84 AbstractTransactionHandler(final TransactionsParams params) {
85 runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds().toJava());
86 delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond().toJava();
89 final synchronized void doStart() {
90 // Setup state first...
92 state = State.RUNNING;
94 writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
97 private void execute() {
98 // Single volatile access
99 final State local = state;
103 // This could happen due to scheduling artifacts
109 throw new IllegalStateException("Unhandled state " + local);
113 private void runningExecute() {
114 final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
115 if (elapsed >= runtimeNanos) {
116 LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
117 completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
121 // Not completed yet: create a transaction and hook it up
122 final long txId = txCounter.incrementAndGet();
123 final FluentFuture<? extends CommitInfo> execFuture = execWrite(txId);
124 LOG.debug("New future #{} allocated", txId);
126 // Ordering is important: we need to add the future before hooking the callback
127 futures.add(execFuture);
128 execFuture.addCallback(new FutureCallback<CommitInfo>() {
130 public void onSuccess(final CommitInfo result) {
131 txSuccess(execFuture, txId);
135 public void onFailure(final Throwable cause) {
136 txFailure(execFuture, txId, cause);
138 }, completingExecutor);
141 private void runtimeUp() {
142 // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
143 completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
144 if (!checkSuccessful()) {
145 state = State.WAITING;
146 writingFuture.cancel(false);
150 private boolean checkSuccessful() {
151 if (futures.isEmpty()) {
152 LOG.debug("Completed waiting for all futures");
153 state = State.SUCCESSFUL;
154 completingFuture.cancel(false);
155 runSuccessful(txCounter.get());
162 final void txSuccess(final ListenableFuture<?> execFuture, final long txId) {
163 LOG.debug("Future #{} completed successfully", txId);
164 futures.remove(execFuture);
166 final State local = state;
176 throw new IllegalStateException("Unhandled state " + local);
180 final void txFailure(final ListenableFuture<?> execFuture, final long txId, final Throwable cause) {
181 LOG.error("Commit future failed for tx # {}", txId, cause);
182 futures.remove(execFuture);
184 final State local = state;
191 state = State.FAILED;
192 writingFuture.cancel(false);
193 runFailed(cause, txId);
196 throw new IllegalStateException("Unhandled state " + local);
200 private void checkComplete() {
201 final int size = futures.size();
206 // Guards iteration against concurrent modification from callbacks
207 synchronized (futures) {
210 for (ListenableFuture<?> future : futures) {
212 future.get(0, TimeUnit.NANOSECONDS);
213 } catch (final TimeoutException e) {
214 LOG.warn("Future #{}/{} not completed yet", offset, size);
215 } catch (final ExecutionException e) {
216 LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
217 } catch (final InterruptedException e) {
218 LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
225 state = State.FAILED;
226 runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds");
229 abstract FluentFuture<? extends CommitInfo> execWrite(long txId);
231 abstract void runFailed(Throwable cause, long txId);
233 abstract void runSuccessful(long allTx);
235 abstract void runTimedOut(String cause);