X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FAbstractTransactionHandler.java;h=1a0f39e6936dec0152da35cb7f96ee8e832be959;hb=3859df9beca8f13f1ff2b2744ed3470a1715bec3;hp=ea0749a2d9d11ee615381f462c69aee4d62de0d2;hpb=e9bda7f551d3c514e23f2d282a1ef1197efa382f;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java index ea0749a2d9..1a0f39e693 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java @@ -11,14 +11,15 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.ArrayDeque; -import java.util.Queue; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -35,7 +36,7 @@ abstract class AbstractTransactionHandler { private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class); - static final int SECOND_AS_NANO = 1000000000; + static final int SECOND_AS_NANO = 1_000_000_000; //2^20 as in the model static final int MAX_ITEM = 1048576; @@ -55,17 +56,29 @@ abstract class AbstractTransactionHandler { static final long INIT_TX_TIMEOUT_SECONDS = 125; - private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5); - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final Queue> futures = new ArrayDeque<>(); + private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15); + + /* + * writingExecutor is a single thread executor. Only this thread will write to datastore, + * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State. + * This thread only adds to futures set. + */ + private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread(); + /* + * completingExecutor is a single thread executor. Only this thread writes to State. + * This thread should never incur any sleep penalty, so RPC response should always come on time. + * This thread only removes from futures set. + */ + private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread(); + private final Collection> futures = Collections.synchronizedSet(new HashSet<>()); private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final long runtimeNanos; private final long delayNanos; - private ScheduledFuture scheduledFuture; - private long txCounter; - private State state; + private ScheduledFuture writingFuture; + private ScheduledFuture completingFuture; + private final AtomicLong txCounter = new AtomicLong(); + private volatile State state; AbstractTransactionHandler(final TransactionsParams params) { runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds()); @@ -73,13 +86,18 @@ abstract class AbstractTransactionHandler { } final synchronized void doStart() { - scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); + // Setup state first... stopwatch.start(); state = State.RUNNING; + + writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); } - private synchronized void execute() { - switch (state) { + private void execute() { + // Single volatile access + final State local = state; + + switch (local) { case FAILED: // This could happen due to scheduling artifacts break; @@ -87,7 +105,7 @@ abstract class AbstractTransactionHandler { runningExecute(); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } @@ -95,26 +113,20 @@ abstract class AbstractTransactionHandler { final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); if (elapsed >= runtimeNanos) { LOG.debug("Reached maximum run time with {} outstanding futures", futures.size()); - if (!checkSuccessful()) { - state = State.WAITING; - scheduledFuture.cancel(false); - scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); - executor.shutdown(); - } - + completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS); return; } // Not completed yet: create a transaction and hook it up - final long txId = txCounter++; - final ListenableFuture execFuture = execWrite(txId); + final long txId = txCounter.incrementAndGet(); + final ListenableFuture execFuture = execWrite(txId); LOG.debug("New future #{} allocated", txId); // Ordering is important: we need to add the future before hooking the callback futures.add(execFuture); - Futures.addCallback(execFuture, new FutureCallback() { + Futures.addCallback(execFuture, new FutureCallback() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final Object result) { txSuccess(execFuture, txId); } @@ -122,14 +134,36 @@ abstract class AbstractTransactionHandler { public void onFailure(final Throwable cause) { txFailure(execFuture, txId, cause); } - }); + }, completingExecutor); + } + + private void runtimeUp() { + // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally. + completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!checkSuccessful()) { + state = State.WAITING; + writingFuture.cancel(false); + } } - final synchronized void txSuccess(final ListenableFuture execFuture, final long txId) { + private boolean checkSuccessful() { + if (futures.isEmpty()) { + LOG.debug("Completed waiting for all futures"); + state = State.SUCCESSFUL; + completingFuture.cancel(false); + runSuccessful(txCounter.get()); + return true; + } + + return false; + } + + final void txSuccess(final ListenableFuture execFuture, final long txId) { LOG.debug("Future #{} completed successfully", txId); futures.remove(execFuture); - switch (state) { + final State local = state; + switch (local) { case FAILED: case RUNNING: // No-op @@ -138,73 +172,64 @@ abstract class AbstractTransactionHandler { checkSuccessful(); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } - final synchronized void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { - LOG.debug("Future #{} failed", txId, cause); + final void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { + LOG.error("Commit future failed for tx # {}", txId, cause); futures.remove(execFuture); - switch (state) { + final State local = state; + switch (local) { case FAILED: // no-op break; case RUNNING: case WAITING: state = State.FAILED; - scheduledFuture.cancel(false); - executor.shutdown(); - runFailed(cause); + writingFuture.cancel(false); + runFailed(cause, txId); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } - private synchronized void checkComplete() { + private void checkComplete() { final int size = futures.size(); if (size == 0) { return; } - int offset = 0; - for (ListenableFuture future : futures) { - try { - future.get(0, TimeUnit.NANOSECONDS); - } catch (final TimeoutException e) { - LOG.warn("Future #{}/{} not completed yet", offset, size); - } catch (final ExecutionException e) { - LOG.warn("Future #{}/{} failed", offset, size, e.getCause()); - } catch (final InterruptedException e) { - LOG.warn("Interrupted while examining future #{}/{}", offset, size, e); + // Guards iteration against concurrent modification from callbacks + synchronized (futures) { + int offset = 0; + + for (ListenableFuture future : futures) { + try { + future.get(0, TimeUnit.NANOSECONDS); + } catch (final TimeoutException e) { + LOG.warn("Future #{}/{} not completed yet", offset, size); + } catch (final ExecutionException e) { + LOG.warn("Future #{}/{} failed", offset, size, e.getCause()); + } catch (final InterruptedException e) { + LOG.warn("Interrupted while examining future #{}/{}", offset, size, e); + } + + ++offset; } - - ++offset; } state = State.FAILED; - runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds")); - } - - private boolean checkSuccessful() { - if (futures.isEmpty()) { - LOG.debug("Completed waiting for all futures"); - state = State.SUCCESSFUL; - scheduledFuture.cancel(false); - executor.shutdown(); - runSuccessful(txCounter); - return true; - } - - return false; + runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"); } - abstract ListenableFuture execWrite(final long txId); + abstract ListenableFuture execWrite(long txId); - abstract void runFailed(Throwable cause); + abstract void runFailed(Throwable cause, long txId); abstract void runSuccessful(long allTx); - abstract void runTimedOut(Exception cause); + abstract void runTimedOut(String cause); }