import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.Queue;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.LoggerFactory;
abstract class AbstractTransactionHandler {
- private abstract static class Phase {
- abstract void txSuccess(ListenableFuture<Void> execFuture, long txId);
-
- abstract void txFailure(ListenableFuture<Void> execFuture, long txId, Throwable cause);
- }
-
- private static final class Running extends Phase {
- private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
- private Throwable failure;
-
- void addFuture(final ListenableFuture<Void> execFuture) {
- futures.add(execFuture);
- }
-
- @Override
- void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
- futures.remove(execFuture);
- }
-
- @Override
- void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
- futures.remove(execFuture);
- if (failure != null) {
- failure = cause;
- }
- }
-
- Optional<Throwable> getFailure() {
- return Optional.ofNullable(failure);
- }
- }
-
- private final class Collecting extends Phase {
- private final List<ListenableFuture<Void>> futures;
- private boolean done;
-
- Collecting(final Collection<ListenableFuture<Void>> futures) {
- this.futures = new ArrayList<>(futures);
- }
-
- @Override
- void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
- futures.remove(execFuture);
- if (futures.isEmpty() && !done) {
- LOG.debug("All futures completed successfully.");
- runSuccessful(txCounter);
- }
- }
-
- @Override
- void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
- futures.remove(execFuture);
- done = true;
- runFailed(cause);
- }
+ private enum State {
+ RUNNING,
+ WAITING,
+ SUCCESSFUL,
+ FAILED,
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
- static final int SECOND_AS_NANO = 1000000000;
+ static final int SECOND_AS_NANO = 1_000_000_000;
//2^20 as in the model
static final int MAX_ITEM = 1048576;
static final long INIT_TX_TIMEOUT_SECONDS = 125;
- private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
-
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
- private final Stopwatch stopwatch = Stopwatch.createStarted();
+ private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
+
+ /*
+ * writingExecutor is a single thread executor. Only this thread will write to datastore,
+ * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
+ * This thread only adds to futures set.
+ */
+ private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+ /*
+ * completingExecutor is a single thread executor. Only this thread writes to State.
+ * This thread should never incur any sleep penalty, so RPC response should always come on time.
+ * This thread only removes from futures set.
+ */
+ private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+ private final Collection<ListenableFuture<Void>> futures = Collections.synchronizedSet(new HashSet<>());
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final long runtimeNanos;
private final long delayNanos;
- private ScheduledFuture<?> scheduledFuture;
+ private ScheduledFuture<?> writingFuture;
+ private ScheduledFuture<?> completingFuture;
private long txCounter;
- @GuardedBy("this")
- private Phase phase;
+ private volatile State state;
AbstractTransactionHandler(final TransactionsParams params) {
runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
}
final synchronized void doStart() {
- phase = new Running();
- scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
+ // Setup state first...
+ stopwatch.start();
+ state = State.RUNNING;
+
+ writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
}
private void execute() {
+ // Single volatile access
+ final State local = state;
+
+ switch (local) {
+ case FAILED:
+ // This could happen due to scheduling artifacts
+ break;
+ case RUNNING:
+ runningExecute();
+ break;
+ default:
+ throw new IllegalStateException("Unhandled state " + local);
+ }
+ }
+
+ private void runningExecute() {
final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
- if (elapsed < runtimeNanos) {
- // Not completed yet: create a transaction and hook it up
- final long txId = txCounter++;
- final ListenableFuture<Void> execFuture = execWrite(txId);
-
- // Ordering is important: we need to add the future before hooking the callback
- synchronized (this) {
- ((Running) phase).addFuture(execFuture);
+ if (elapsed >= runtimeNanos) {
+ LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
+ completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
+ return;
+ }
+
+ // Not completed yet: create a transaction and hook it up
+ final long txId = txCounter++;
+ final ListenableFuture<Void> execFuture = execWrite(txId);
+ LOG.debug("New future #{} allocated", txId);
+
+ // Ordering is important: we need to add the future before hooking the callback
+ futures.add(execFuture);
+ Futures.addCallback(execFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ txSuccess(execFuture, txId);
}
- Futures.addCallback(execFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- txSuccess(execFuture, txId);
- }
- @Override
- public void onFailure(final Throwable cause) {
- txFailure(execFuture, txId, cause);
- }
- });
- } else {
- startCollection();
- }
+ @Override
+ public void onFailure(final Throwable cause) {
+ txFailure(execFuture, txId, cause);
+ }
+ }, completingExecutor);
}
- private synchronized void startCollection() {
- scheduledFuture.cancel(false);
-
- final Running running = (Running) phase;
- final Optional<Throwable> failure = running.getFailure();
- if (failure.isPresent()) {
- executor.shutdown();
- runFailed(failure.get());
- return;
+ private void runtimeUp() {
+ // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
+ completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ if (!checkSuccessful()) {
+ state = State.WAITING;
+ writingFuture.cancel(false);
}
+ }
- LOG.debug("Reached maximum run time with {} outstanding futures", running.futures.size());
- if (running.futures.isEmpty()) {
- executor.shutdown();
+ private boolean checkSuccessful() {
+ if (futures.isEmpty()) {
+ LOG.debug("Completed waiting for all futures");
+ state = State.SUCCESSFUL;
+ completingFuture.cancel(false);
runSuccessful(txCounter);
- return;
+ return true;
}
- phase = new Collecting(running.futures);
- executor.schedule(this::checkCollection, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- executor.shutdown();
+ return false;
}
- final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+ final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
LOG.debug("Future #{} completed successfully", txId);
- phase.txSuccess(execFuture, txId);
+ futures.remove(execFuture);
+
+ final State local = state;
+ switch (local) {
+ case FAILED:
+ case RUNNING:
+ // No-op
+ break;
+ case WAITING:
+ checkSuccessful();
+ break;
+ default:
+ throw new IllegalStateException("Unhandled state " + local);
+ }
}
- final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+ final void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
LOG.debug("Future #{} failed", txId, cause);
- phase.txFailure(execFuture, txId, cause);
+ futures.remove(execFuture);
+
+ final State local = state;
+ switch (local) {
+ case FAILED:
+ // no-op
+ break;
+ case RUNNING:
+ case WAITING:
+ state = State.FAILED;
+ writingFuture.cancel(false);
+ runFailed(cause);
+ break;
+ default:
+ throw new IllegalStateException("Unhandled state " + local);
+ }
}
- private synchronized void checkCollection() {
- final Collecting collecting = (Collecting) phase;
- if (!collecting.done) {
- final int size = collecting.futures.size();
- for (int i = 0; i < size; i++) {
- final ListenableFuture<Void> future = collecting.futures.get(i);
+ private void checkComplete() {
+ final int size = futures.size();
+ if (size == 0) {
+ return;
+ }
+
+ // Guards iteration against concurrent modification from callbacks
+ synchronized (futures) {
+ int offset = 0;
+ for (ListenableFuture<Void> future : futures) {
try {
future.get(0, TimeUnit.NANOSECONDS);
} catch (final TimeoutException e) {
- LOG.warn("Future #{}/{} not completed yet", i, size);
+ LOG.warn("Future #{}/{} not completed yet", offset, size);
} catch (final ExecutionException e) {
- LOG.warn("Future #{}/{} failed", i, size, e.getCause());
+ LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
} catch (final InterruptedException e) {
- LOG.warn("Interrupted while examining future #{}/{}", i, size, e);
+ LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
}
- }
- runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
+ ++offset;
+ }
}
+
+ state = State.FAILED;
+ runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
}
- abstract ListenableFuture<Void> execWrite(final long txId);
+ abstract ListenableFuture<Void> execWrite(long txId);
abstract void runFailed(Throwable cause);