X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FAbstractTransactionHandler.java;h=d3b0a7b0497fbbc204edb6900738cc15fa0005fa;hp=d0923ce6de7a5b4486f2292231c3fcc0f93dc471;hb=df80cb74afc5dee73bbd930133b06add52801225;hpb=cc4d0505cacbca16f1a8a751a794c4091329db0d diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java index d0923ce6de..d3b0a7b049 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java @@ -12,6 +12,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -56,15 +57,27 @@ abstract class AbstractTransactionHandler { private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15); - private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread(); - private final Collection> futures = new HashSet<>(); + /* + * writingExecutor is a single thread executor. Only this thread will write to datastore, + * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State. + * This thread only adds to futures set. + */ + private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread(); + /* + * completingExecutor is a single thread executor. Only this thread writes to State. + * This thread should never incur any sleep penalty, so RPC response should always come on time. + * This thread only removes from futures set. + */ + private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread(); + private final Collection> futures = Collections.synchronizedSet(new HashSet<>()); private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final long runtimeNanos; private final long delayNanos; - private ScheduledFuture scheduledFuture; + private ScheduledFuture writingFuture; + private ScheduledFuture completingFuture; private long txCounter; - private State state; + private volatile State state; AbstractTransactionHandler(final TransactionsParams params) { runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds()); @@ -72,13 +85,18 @@ abstract class AbstractTransactionHandler { } final synchronized void doStart() { - scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); + // Setup state first... stopwatch.start(); state = State.RUNNING; + + writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); } - private synchronized void execute() { - switch (state) { + private void execute() { + // Single volatile access + final State local = state; + + switch (local) { case FAILED: // This could happen due to scheduling artifacts break; @@ -86,7 +104,7 @@ abstract class AbstractTransactionHandler { runningExecute(); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } @@ -94,12 +112,7 @@ abstract class AbstractTransactionHandler { final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); if (elapsed >= runtimeNanos) { LOG.debug("Reached maximum run time with {} outstanding futures", futures.size()); - if (!checkSuccessful()) { - state = State.WAITING; - scheduledFuture.cancel(false); - scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } - + completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS); return; } @@ -120,14 +133,36 @@ abstract class AbstractTransactionHandler { public void onFailure(final Throwable cause) { txFailure(execFuture, txId, cause); } - }, executor); + }, completingExecutor); + } + + private void runtimeUp() { + // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally. + completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!checkSuccessful()) { + state = State.WAITING; + writingFuture.cancel(false); + } + } + + private boolean checkSuccessful() { + if (futures.isEmpty()) { + LOG.debug("Completed waiting for all futures"); + state = State.SUCCESSFUL; + completingFuture.cancel(false); + runSuccessful(txCounter); + return true; + } + + return false; } final void txSuccess(final ListenableFuture execFuture, final long txId) { LOG.debug("Future #{} completed successfully", txId); futures.remove(execFuture); - switch (state) { + final State local = state; + switch (local) { case FAILED: case RUNNING: // No-op @@ -136,7 +171,7 @@ abstract class AbstractTransactionHandler { checkSuccessful(); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } @@ -144,18 +179,19 @@ abstract class AbstractTransactionHandler { LOG.debug("Future #{} failed", txId, cause); futures.remove(execFuture); - switch (state) { + final State local = state; + switch (local) { case FAILED: // no-op break; case RUNNING: case WAITING: state = State.FAILED; - scheduledFuture.cancel(false); + writingFuture.cancel(false); runFailed(cause); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } @@ -165,37 +201,29 @@ abstract class AbstractTransactionHandler { return; } - int offset = 0; - for (ListenableFuture future : futures) { - try { - future.get(0, TimeUnit.NANOSECONDS); - } catch (final TimeoutException e) { - LOG.warn("Future #{}/{} not completed yet", offset, size); - } catch (final ExecutionException e) { - LOG.warn("Future #{}/{} failed", offset, size, e.getCause()); - } catch (final InterruptedException e) { - LOG.warn("Interrupted while examining future #{}/{}", offset, size, e); + // Guards iteration against concurrent modification from callbacks + synchronized (futures) { + int offset = 0; + + for (ListenableFuture future : futures) { + try { + future.get(0, TimeUnit.NANOSECONDS); + } catch (final TimeoutException e) { + LOG.warn("Future #{}/{} not completed yet", offset, size); + } catch (final ExecutionException e) { + LOG.warn("Future #{}/{} failed", offset, size, e.getCause()); + } catch (final InterruptedException e) { + LOG.warn("Interrupted while examining future #{}/{}", offset, size, e); + } + + ++offset; } - - ++offset; } state = State.FAILED; runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds")); } - private boolean checkSuccessful() { - if (futures.isEmpty()) { - LOG.debug("Completed waiting for all futures"); - state = State.SUCCESSFUL; - scheduledFuture.cancel(false); - runSuccessful(txCounter); - return true; - } - - return false; - } - abstract ListenableFuture execWrite(final long txId); abstract void runFailed(Throwable cause);