If AbstractTransactionHandler uses only one executor thread,
future completion callbacks are delayed by throttling on writes.
CSIT aims to detect RequestTimeoutException within a narrow window,
so a separate executor for callbacks is used now.
The delay would not be that critical, but the problem is the timing
between a scheduled execution which exceeds scheduling gaps. These
seem to hold up normally-submitted tasks, leading to futures never
completing.
Therefore we use two Executors and synchronize state modification
call sites. Hence the two tasks (throttled producer) and future
completions can run concurrently (aside from state synchronization).
Change-Id: I642c5295ab6188b2d7e1b5feae62ab7ef52d41eb
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
8744119235b90d89021567e5f12361d98b823b8f)
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
- private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread();
- private final Collection<ListenableFuture<Void>> futures = new HashSet<>();
+ /*
+ * writingExecutor is a single thread executor. Only this thread will write to datastore,
+ * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
+ * This thread only adds to futures set.
+ */
+ private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+ /*
+ * completingExecutor is a single thread executor. Only this thread writes to State.
+ * This thread should never incur any sleep penalty, so RPC response should always come on time.
+ * This thread only removes from futures set.
+ */
+ private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+ private final Collection<ListenableFuture<Void>> futures = Collections.synchronizedSet(new HashSet<>());
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final long runtimeNanos;
private final long delayNanos;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final long runtimeNanos;
private final long delayNanos;
- private ScheduledFuture<?> scheduledFuture;
+ private ScheduledFuture<?> writingFuture;
+ private ScheduledFuture<?> completingFuture;
+ private volatile State state;
AbstractTransactionHandler(final TransactionsParams params) {
runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
AbstractTransactionHandler(final TransactionsParams params) {
runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
}
final synchronized void doStart() {
}
final synchronized void doStart() {
- scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
+ // Setup state first...
stopwatch.start();
state = State.RUNNING;
stopwatch.start();
state = State.RUNNING;
+
+ writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
- private synchronized void execute() {
- switch (state) {
+ private void execute() {
+ // Single volatile access
+ final State local = state;
+
+ switch (local) {
case FAILED:
// This could happen due to scheduling artifacts
break;
case FAILED:
// This could happen due to scheduling artifacts
break;
runningExecute();
break;
default:
runningExecute();
break;
default:
- throw new IllegalStateException("Unhandled state " + state);
+ throw new IllegalStateException("Unhandled state " + local);
final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
if (elapsed >= runtimeNanos) {
LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
if (elapsed >= runtimeNanos) {
LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
- if (!checkSuccessful()) {
- state = State.WAITING;
- scheduledFuture.cancel(false);
- scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- }
-
+ completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
public void onFailure(final Throwable cause) {
txFailure(execFuture, txId, cause);
}
public void onFailure(final Throwable cause) {
txFailure(execFuture, txId, cause);
}
+ }, completingExecutor);
+ }
+
+ private void runtimeUp() {
+ // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
+ completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ if (!checkSuccessful()) {
+ state = State.WAITING;
+ writingFuture.cancel(false);
+ }
+ }
+
+ private boolean checkSuccessful() {
+ if (futures.isEmpty()) {
+ LOG.debug("Completed waiting for all futures");
+ state = State.SUCCESSFUL;
+ completingFuture.cancel(false);
+ runSuccessful(txCounter);
+ return true;
+ }
+
+ return false;
}
final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
LOG.debug("Future #{} completed successfully", txId);
futures.remove(execFuture);
}
final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
LOG.debug("Future #{} completed successfully", txId);
futures.remove(execFuture);
+ final State local = state;
+ switch (local) {
case FAILED:
case RUNNING:
// No-op
case FAILED:
case RUNNING:
// No-op
checkSuccessful();
break;
default:
checkSuccessful();
break;
default:
- throw new IllegalStateException("Unhandled state " + state);
+ throw new IllegalStateException("Unhandled state " + local);
LOG.debug("Future #{} failed", txId, cause);
futures.remove(execFuture);
LOG.debug("Future #{} failed", txId, cause);
futures.remove(execFuture);
+ final State local = state;
+ switch (local) {
case FAILED:
// no-op
break;
case RUNNING:
case WAITING:
state = State.FAILED;
case FAILED:
// no-op
break;
case RUNNING:
case WAITING:
state = State.FAILED;
- scheduledFuture.cancel(false);
+ writingFuture.cancel(false);
runFailed(cause);
break;
default:
runFailed(cause);
break;
default:
- throw new IllegalStateException("Unhandled state " + state);
+ throw new IllegalStateException("Unhandled state " + local);
- int offset = 0;
- for (ListenableFuture<Void> future : futures) {
- try {
- future.get(0, TimeUnit.NANOSECONDS);
- } catch (final TimeoutException e) {
- LOG.warn("Future #{}/{} not completed yet", offset, size);
- } catch (final ExecutionException e) {
- LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
- } catch (final InterruptedException e) {
- LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
+ // Guards iteration against concurrent modification from callbacks
+ synchronized (futures) {
+ int offset = 0;
+
+ for (ListenableFuture<Void> future : futures) {
+ try {
+ future.get(0, TimeUnit.NANOSECONDS);
+ } catch (final TimeoutException e) {
+ LOG.warn("Future #{}/{} not completed yet", offset, size);
+ } catch (final ExecutionException e) {
+ LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
+ } catch (final InterruptedException e) {
+ LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
+ }
+
+ ++offset;
}
state = State.FAILED;
runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
}
}
state = State.FAILED;
runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
}
- private boolean checkSuccessful() {
- if (futures.isEmpty()) {
- LOG.debug("Completed waiting for all futures");
- state = State.SUCCESSFUL;
- scheduledFuture.cancel(false);
- runSuccessful(txCounter);
- return true;
- }
-
- return false;
- }
-
abstract ListenableFuture<Void> execWrite(final long txId);
abstract void runFailed(Throwable cause);
abstract ListenableFuture<Void> execWrite(final long txId);
abstract void runFailed(Throwable cause);
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
final AsyncTransaction<?, ?> transaction, final Throwable cause) {
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
final AsyncTransaction<?, ?> transaction, final Throwable cause) {
- LOG.warn("Transaction chain failed.", cause);
- completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
+ // This is expected to happen frequently in isolation testing.
+ LOG.debug("Transaction chain failed.", cause);
+ // Do not return RPC here, rely on transaction failure to call runFailed.