From df80cb74afc5dee73bbd930133b06add52801225 Mon Sep 17 00:00:00 2001 From: Vratko Polak Date: Fri, 21 Jul 2017 12:24:49 +0200 Subject: [PATCH] Bug 8494: Separate writing and completion threads If AbstractTransactionHandler uses only one executor thread, future completion callbacks are delayed by throttling on writes. CSIT aims to detect RequestTimeoutException within a narrow window, so a separate executor for callbacks is used now. The delay would not be that critical, but the problem is the timing between a scheduled execution which exceeds scheduling gaps. These seem to hold up normally-submitted tasks, leading to futures never completing. Therefore we use two Executors and synchronize state modification call sites. Hence the two tasks (throttled producer) and future completions can run concurrently (aside from state synchronization). Change-Id: I642c5295ab6188b2d7e1b5feae62ab7ef52d41eb Signed-off-by: Vratko Polak Signed-off-by: Robert Varga (cherry picked from commit 8744119235b90d89021567e5f12361d98b823b8f) --- .../impl/AbstractTransactionHandler.java | 116 +++++++++++------- .../impl/WriteTransactionsHandler.java | 6 +- 2 files changed, 75 insertions(+), 47 deletions(-) diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java index d0923ce6de..d3b0a7b049 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java @@ -12,6 +12,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -56,15 +57,27 @@ abstract class AbstractTransactionHandler { private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15); - private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread(); - private final Collection> futures = new HashSet<>(); + /* + * writingExecutor is a single thread executor. Only this thread will write to datastore, + * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State. + * This thread only adds to futures set. + */ + private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread(); + /* + * completingExecutor is a single thread executor. Only this thread writes to State. + * This thread should never incur any sleep penalty, so RPC response should always come on time. + * This thread only removes from futures set. + */ + private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread(); + private final Collection> futures = Collections.synchronizedSet(new HashSet<>()); private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final long runtimeNanos; private final long delayNanos; - private ScheduledFuture scheduledFuture; + private ScheduledFuture writingFuture; + private ScheduledFuture completingFuture; private long txCounter; - private State state; + private volatile State state; AbstractTransactionHandler(final TransactionsParams params) { runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds()); @@ -72,13 +85,18 @@ abstract class AbstractTransactionHandler { } final synchronized void doStart() { - scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); + // Setup state first... stopwatch.start(); state = State.RUNNING; + + writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); } - private synchronized void execute() { - switch (state) { + private void execute() { + // Single volatile access + final State local = state; + + switch (local) { case FAILED: // This could happen due to scheduling artifacts break; @@ -86,7 +104,7 @@ abstract class AbstractTransactionHandler { runningExecute(); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } @@ -94,12 +112,7 @@ abstract class AbstractTransactionHandler { final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); if (elapsed >= runtimeNanos) { LOG.debug("Reached maximum run time with {} outstanding futures", futures.size()); - if (!checkSuccessful()) { - state = State.WAITING; - scheduledFuture.cancel(false); - scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } - + completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS); return; } @@ -120,14 +133,36 @@ abstract class AbstractTransactionHandler { public void onFailure(final Throwable cause) { txFailure(execFuture, txId, cause); } - }, executor); + }, completingExecutor); + } + + private void runtimeUp() { + // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally. + completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!checkSuccessful()) { + state = State.WAITING; + writingFuture.cancel(false); + } + } + + private boolean checkSuccessful() { + if (futures.isEmpty()) { + LOG.debug("Completed waiting for all futures"); + state = State.SUCCESSFUL; + completingFuture.cancel(false); + runSuccessful(txCounter); + return true; + } + + return false; } final void txSuccess(final ListenableFuture execFuture, final long txId) { LOG.debug("Future #{} completed successfully", txId); futures.remove(execFuture); - switch (state) { + final State local = state; + switch (local) { case FAILED: case RUNNING: // No-op @@ -136,7 +171,7 @@ abstract class AbstractTransactionHandler { checkSuccessful(); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } @@ -144,18 +179,19 @@ abstract class AbstractTransactionHandler { LOG.debug("Future #{} failed", txId, cause); futures.remove(execFuture); - switch (state) { + final State local = state; + switch (local) { case FAILED: // no-op break; case RUNNING: case WAITING: state = State.FAILED; - scheduledFuture.cancel(false); + writingFuture.cancel(false); runFailed(cause); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } @@ -165,37 +201,29 @@ abstract class AbstractTransactionHandler { return; } - int offset = 0; - for (ListenableFuture future : futures) { - try { - future.get(0, TimeUnit.NANOSECONDS); - } catch (final TimeoutException e) { - LOG.warn("Future #{}/{} not completed yet", offset, size); - } catch (final ExecutionException e) { - LOG.warn("Future #{}/{} failed", offset, size, e.getCause()); - } catch (final InterruptedException e) { - LOG.warn("Interrupted while examining future #{}/{}", offset, size, e); + // Guards iteration against concurrent modification from callbacks + synchronized (futures) { + int offset = 0; + + for (ListenableFuture future : futures) { + try { + future.get(0, TimeUnit.NANOSECONDS); + } catch (final TimeoutException e) { + LOG.warn("Future #{}/{} not completed yet", offset, size); + } catch (final ExecutionException e) { + LOG.warn("Future #{}/{} failed", offset, size, e.getCause()); + } catch (final InterruptedException e) { + LOG.warn("Interrupted while examining future #{}/{}", offset, size, e); + } + + ++offset; } - - ++offset; } state = State.FAILED; runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds")); } - private boolean checkSuccessful() { - if (futures.isEmpty()) { - LOG.debug("Completed waiting for all futures"); - state = State.SUCCESSFUL; - scheduledFuture.cancel(false); - runSuccessful(txCounter); - return true; - } - - return false; - } - abstract ListenableFuture execWrite(final long txId); abstract void runFailed(Throwable cause); diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java index 797e252a91..a8363c1d2f 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java @@ -68,9 +68,9 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle @Override public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, final Throwable cause) { - LOG.warn("Transaction chain failed.", cause); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build()); + // This is expected to happen frequently in isolation testing. + LOG.debug("Transaction chain failed.", cause); + // Do not return RPC here, rely on transaction failure to call runFailed. } @Override -- 2.36.6