X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FAbstractTransactionHandler.java;h=e821a2b5769211d53ad5cbcc2a43d58ba1f15e31;hb=da174be7e22b16d4ac80cccefdc52b209b700745;hp=ea0749a2d9d11ee615381f462c69aee4d62de0d2;hpb=e9bda7f551d3c514e23f2d282a1ef1197efa382f;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java index ea0749a2d9..e821a2b576 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java @@ -8,17 +8,21 @@ package org.opendaylight.controller.clustering.it.provider.impl; import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.ArrayDeque; -import java.util.Queue; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -35,7 +39,7 @@ abstract class AbstractTransactionHandler { private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class); - static final int SECOND_AS_NANO = 1000000000; + static final int SECOND_AS_NANO = 1_000_000_000; //2^20 as in the model static final int MAX_ITEM = 1048576; @@ -55,31 +59,49 @@ abstract class AbstractTransactionHandler { static final long INIT_TX_TIMEOUT_SECONDS = 125; - private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5); - - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final Queue> futures = new ArrayDeque<>(); + private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15); + private static final AtomicLong COUNTER = new AtomicLong(); + + /* + * writingExecutor is a single thread executor. Only this thread will write to datastore, + * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State. + * This thread only adds to futures set. + */ + private final ScheduledExecutorService writingExecutor = newExecutorService("writing"); + /* + * completingExecutor is a single thread executor. Only this thread writes to State. + * This thread should never incur any sleep penalty, so RPC response should always come on time. + * This thread only removes from futures set. + */ + private final ScheduledExecutorService completingExecutor = newExecutorService("completing"); + private final Collection> futures = Collections.synchronizedSet(new HashSet<>()); private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final long runtimeNanos; private final long delayNanos; - private ScheduledFuture scheduledFuture; - private long txCounter; - private State state; + private ScheduledFuture writingFuture; + private ScheduledFuture completingFuture; + private final AtomicLong txCounter = new AtomicLong(); + private volatile State state; AbstractTransactionHandler(final TransactionsParams params) { - runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds()); - delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond(); + runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds().toJava()); + delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond().toJava(); } final synchronized void doStart() { - scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); + // Setup state first... stopwatch.start(); state = State.RUNNING; + + writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS); } - private synchronized void execute() { - switch (state) { + private void execute() { + // Single volatile access + final State local = state; + + switch (local) { case FAILED: // This could happen due to scheduling artifacts break; @@ -87,7 +109,7 @@ abstract class AbstractTransactionHandler { runningExecute(); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } @@ -95,26 +117,20 @@ abstract class AbstractTransactionHandler { final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); if (elapsed >= runtimeNanos) { LOG.debug("Reached maximum run time with {} outstanding futures", futures.size()); - if (!checkSuccessful()) { - state = State.WAITING; - scheduledFuture.cancel(false); - scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); - executor.shutdown(); - } - + completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS); return; } // Not completed yet: create a transaction and hook it up - final long txId = txCounter++; - final ListenableFuture execFuture = execWrite(txId); + final long txId = txCounter.incrementAndGet(); + final FluentFuture execFuture = execWrite(txId); LOG.debug("New future #{} allocated", txId); // Ordering is important: we need to add the future before hooking the callback futures.add(execFuture); - Futures.addCallback(execFuture, new FutureCallback() { + execFuture.addCallback(new FutureCallback() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final CommitInfo result) { txSuccess(execFuture, txId); } @@ -122,14 +138,37 @@ abstract class AbstractTransactionHandler { public void onFailure(final Throwable cause) { txFailure(execFuture, txId, cause); } - }); + }, completingExecutor); + } + + private void runtimeUp() { + // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally. + completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!checkSuccessful()) { + state = State.WAITING; + writingFuture.cancel(false); + } } - final synchronized void txSuccess(final ListenableFuture execFuture, final long txId) { + private boolean checkSuccessful() { + if (futures.isEmpty()) { + LOG.debug("Completed waiting for all futures"); + state = State.SUCCESSFUL; + completingFuture.cancel(false); + runSuccessful(txCounter.get()); + shutdownExecutors(); + return true; + } + + return false; + } + + final void txSuccess(final ListenableFuture execFuture, final long txId) { LOG.debug("Future #{} completed successfully", txId); futures.remove(execFuture); - switch (state) { + final State local = state; + switch (local) { case FAILED: case RUNNING: // No-op @@ -138,73 +177,81 @@ abstract class AbstractTransactionHandler { checkSuccessful(); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } - final synchronized void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { - LOG.debug("Future #{} failed", txId, cause); + final void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { + LOG.error("Commit future failed for tx # {}", txId, cause); futures.remove(execFuture); - switch (state) { + final State local = state; + switch (local) { case FAILED: // no-op break; case RUNNING: case WAITING: state = State.FAILED; - scheduledFuture.cancel(false); - executor.shutdown(); - runFailed(cause); + writingFuture.cancel(false); + runFailed(cause, txId); + shutdownExecutors(); break; default: - throw new IllegalStateException("Unhandled state " + state); + throw new IllegalStateException("Unhandled state " + local); } } - private synchronized void checkComplete() { + private void checkComplete() { final int size = futures.size(); if (size == 0) { return; } - int offset = 0; - for (ListenableFuture future : futures) { - try { - future.get(0, TimeUnit.NANOSECONDS); - } catch (final TimeoutException e) { - LOG.warn("Future #{}/{} not completed yet", offset, size); - } catch (final ExecutionException e) { - LOG.warn("Future #{}/{} failed", offset, size, e.getCause()); - } catch (final InterruptedException e) { - LOG.warn("Interrupted while examining future #{}/{}", offset, size, e); + // Guards iteration against concurrent modification from callbacks + synchronized (futures) { + int offset = 0; + + for (ListenableFuture future : futures) { + try { + future.get(0, TimeUnit.NANOSECONDS); + } catch (final TimeoutException e) { + LOG.warn("Future #{}/{} not completed yet", offset, size); + } catch (final ExecutionException e) { + LOG.warn("Future #{}/{} failed", offset, size, e.getCause()); + } catch (final InterruptedException e) { + LOG.warn("Interrupted while examining future #{}/{}", offset, size, e); + } + + ++offset; } - - ++offset; } state = State.FAILED; - runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds")); + runTimedOut("Transactions did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"); + shutdownExecutors(); } - private boolean checkSuccessful() { - if (futures.isEmpty()) { - LOG.debug("Completed waiting for all futures"); - state = State.SUCCESSFUL; - scheduledFuture.cancel(false); - executor.shutdown(); - runSuccessful(txCounter); - return true; - } - - return false; + private void shutdownExecutors() { + writingExecutor.shutdown(); + completingExecutor.shutdown(); } - abstract ListenableFuture execWrite(final long txId); + abstract FluentFuture execWrite(long txId); - abstract void runFailed(Throwable cause); + abstract void runFailed(Throwable cause, long txId); abstract void runSuccessful(long allTx); - abstract void runTimedOut(Exception cause); + abstract void runTimedOut(String cause); + + private ScheduledExecutorService newExecutorService(final String kind) { + final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(getClass().getSimpleName() + "-" + kind + "-" + COUNTER.getAndIncrement() + "%d") + .build()); + executor.setKeepAliveTime(15, TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + return executor; + } }