From: Robert Varga Date: Wed, 28 Jun 2017 09:34:34 +0000 (+0200) Subject: BUG-8494: rework AbstractTransactionHandler X-Git-Tag: release/nitrogen~68 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=e9bda7f551d3c514e23f2d282a1ef1197efa382f BUG-8494: rework AbstractTransactionHandler If we have a transaction failure while we are producing transactions, we could end up adding a delay until the failure is detected as we would continue jamming in transactions. Rework internal logic to halt processing as soon as a failure is seen, speeding up detection and simplifying code. Change-Id: I19d13c78d94bb39481abde477ec4e3df03a6aa57 Signed-off-by: Robert Varga (cherry picked from commit b7657c3ac7b4697372674b75e820581a6d59e2ba) --- diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java index 250b3b30e9..ea0749a2d9 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java @@ -12,10 +12,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -23,7 +19,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.annotation.concurrent.GuardedBy; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -31,61 +26,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class AbstractTransactionHandler { - private abstract static class Phase { - abstract void txSuccess(ListenableFuture execFuture, long txId); - - abstract void txFailure(ListenableFuture execFuture, long txId, Throwable cause); - } - - private static final class Running extends Phase { - private final Queue> futures = new ArrayDeque<>(); - private Throwable failure; - - void addFuture(final ListenableFuture execFuture) { - futures.add(execFuture); - } - - @Override - void txSuccess(final ListenableFuture execFuture, final long txId) { - futures.remove(execFuture); - } - - @Override - void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { - futures.remove(execFuture); - if (failure == null) { - failure = cause; - } - } - - Optional getFailure() { - return Optional.ofNullable(failure); - } - } - - private final class Collecting extends Phase { - private final List> futures; - private boolean done; - - Collecting(final Collection> futures) { - this.futures = new ArrayList<>(futures); - } - - @Override - void txSuccess(final ListenableFuture execFuture, final long txId) { - futures.remove(execFuture); - if (futures.isEmpty() && !done) { - LOG.debug("All futures completed successfully."); - runSuccessful(txCounter); - } - } - - @Override - void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { - futures.remove(execFuture); - done = true; - runFailed(cause); - } + private enum State { + RUNNING, + WAITING, + SUCCESSFUL, + FAILED, } private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class); @@ -113,14 +58,14 @@ abstract class AbstractTransactionHandler { private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5); private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final Stopwatch stopwatch = Stopwatch.createStarted(); + private final Queue> futures = new ArrayDeque<>(); + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final long runtimeNanos; private final long delayNanos; private ScheduledFuture scheduledFuture; private long txCounter; - @GuardedBy("this") - private Phase phase; + private State state; AbstractTransactionHandler(final TransactionsParams params) { runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds()); @@ -128,90 +73,131 @@ abstract class AbstractTransactionHandler { } final synchronized void doStart() { - phase = new Running(); scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); + stopwatch.start(); + state = State.RUNNING; } - private void execute() { - final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); - if (elapsed < runtimeNanos) { - // Not completed yet: create a transaction and hook it up - final long txId = txCounter++; - final ListenableFuture execFuture = execWrite(txId); - - // Ordering is important: we need to add the future before hooking the callback - synchronized (this) { - ((Running) phase).addFuture(execFuture); - } - Futures.addCallback(execFuture, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - txSuccess(execFuture, txId); - } - - @Override - public void onFailure(final Throwable cause) { - txFailure(execFuture, txId, cause); - } - }); - } else { - startCollection(); + private synchronized void execute() { + switch (state) { + case FAILED: + // This could happen due to scheduling artifacts + break; + case RUNNING: + runningExecute(); + break; + default: + throw new IllegalStateException("Unhandled state " + state); } } - private synchronized void startCollection() { - scheduledFuture.cancel(false); + private void runningExecute() { + final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + if (elapsed >= runtimeNanos) { + LOG.debug("Reached maximum run time with {} outstanding futures", futures.size()); + if (!checkSuccessful()) { + state = State.WAITING; + scheduledFuture.cancel(false); + scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); + executor.shutdown(); + } - final Running running = (Running) phase; - final Optional failure = running.getFailure(); - if (failure.isPresent()) { - executor.shutdown(); - runFailed(failure.get()); return; } - LOG.debug("Reached maximum run time with {} outstanding futures", running.futures.size()); - if (running.futures.isEmpty()) { - executor.shutdown(); - runSuccessful(txCounter); - return; - } + // Not completed yet: create a transaction and hook it up + final long txId = txCounter++; + final ListenableFuture execFuture = execWrite(txId); + LOG.debug("New future #{} allocated", txId); + + // Ordering is important: we need to add the future before hooking the callback + futures.add(execFuture); + Futures.addCallback(execFuture, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + txSuccess(execFuture, txId); + } - phase = new Collecting(running.futures); - executor.schedule(this::checkCollection, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); - executor.shutdown(); + @Override + public void onFailure(final Throwable cause) { + txFailure(execFuture, txId, cause); + } + }); } final synchronized void txSuccess(final ListenableFuture execFuture, final long txId) { LOG.debug("Future #{} completed successfully", txId); - phase.txSuccess(execFuture, txId); + futures.remove(execFuture); + + switch (state) { + case FAILED: + case RUNNING: + // No-op + break; + case WAITING: + checkSuccessful(); + break; + default: + throw new IllegalStateException("Unhandled state " + state); + } } final synchronized void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { LOG.debug("Future #{} failed", txId, cause); - phase.txFailure(execFuture, txId, cause); + futures.remove(execFuture); + + switch (state) { + case FAILED: + // no-op + break; + case RUNNING: + case WAITING: + state = State.FAILED; + scheduledFuture.cancel(false); + executor.shutdown(); + runFailed(cause); + break; + default: + throw new IllegalStateException("Unhandled state " + state); + } } - private synchronized void checkCollection() { - final Collecting collecting = (Collecting) phase; - if (!collecting.done) { - final int size = collecting.futures.size(); - for (int i = 0; i < size; i++) { - final ListenableFuture future = collecting.futures.get(i); - - try { - future.get(0, TimeUnit.NANOSECONDS); - } catch (final TimeoutException e) { - LOG.warn("Future #{}/{} not completed yet", i, size); - } catch (final ExecutionException e) { - LOG.warn("Future #{}/{} failed", i, size, e.getCause()); - } catch (final InterruptedException e) { - LOG.warn("Interrupted while examining future #{}/{}", i, size, e); - } + private synchronized void checkComplete() { + final int size = futures.size(); + if (size == 0) { + return; + } + + int offset = 0; + for (ListenableFuture future : futures) { + try { + future.get(0, TimeUnit.NANOSECONDS); + } catch (final TimeoutException e) { + LOG.warn("Future #{}/{} not completed yet", offset, size); + } catch (final ExecutionException e) { + LOG.warn("Future #{}/{} failed", offset, size, e.getCause()); + } catch (final InterruptedException e) { + LOG.warn("Interrupted while examining future #{}/{}", offset, size, e); } - runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds")); + ++offset; } + + state = State.FAILED; + runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds")); + } + + private boolean checkSuccessful() { + if (futures.isEmpty()) { + LOG.debug("Completed waiting for all futures"); + state = State.SUCCESSFUL; + scheduledFuture.cancel(false); + executor.shutdown(); + runSuccessful(txCounter); + return true; + } + + return false; } abstract ListenableFuture execWrite(final long txId);