Bug 8494: Separate writing and completion threads 03/60703/1
authorVratko Polak <vrpolak@cisco.com>
Fri, 21 Jul 2017 10:24:49 +0000 (12:24 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 24 Jul 2017 21:38:21 +0000 (21:38 +0000)
If AbstractTransactionHandler uses only one executor thread,
future completion callbacks are delayed by throttling on writes.
CSIT aims to detect RequestTimeoutException within a narrow window,
so a separate executor for callbacks is used now.

The delay would not be that critical, but the problem is the timing
between a scheduled execution which exceeds scheduling gaps. These
seem to hold up normally-submitted tasks, leading to futures never
completing.

Therefore we use two Executors and synchronize state modification
call sites. Hence the two tasks (throttled producer) and future
completions can run concurrently (aside from state synchronization).

Change-Id: I642c5295ab6188b2d7e1b5feae62ab7ef52d41eb
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 8744119235b90d89021567e5f12361d98b823b8f)

opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java

index d0923ce..d3b0a7b 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -56,15 +57,27 @@ abstract class AbstractTransactionHandler {
 
     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
 
-    private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread();
-    private final Collection<ListenableFuture<Void>> futures = new HashSet<>();
+    /*
+     * writingExecutor is a single thread executor. Only this thread will write to datastore,
+     * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
+     * This thread only adds to futures set.
+     */
+    private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+    /*
+     * completingExecutor is a single thread executor. Only this thread writes to State.
+     * This thread should never incur any sleep penalty, so RPC response should always come on time.
+     * This thread only removes from futures set.
+     */
+    private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+    private final Collection<ListenableFuture<Void>> futures = Collections.synchronizedSet(new HashSet<>());
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
     private final long runtimeNanos;
     private final long delayNanos;
 
-    private ScheduledFuture<?> scheduledFuture;
+    private ScheduledFuture<?> writingFuture;
+    private ScheduledFuture<?> completingFuture;
     private long txCounter;
-    private State state;
+    private volatile State state;
 
     AbstractTransactionHandler(final TransactionsParams params) {
         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
@@ -72,13 +85,18 @@ abstract class AbstractTransactionHandler {
     }
 
     final synchronized void doStart() {
-        scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
+        // Setup state first...
         stopwatch.start();
         state = State.RUNNING;
+
+        writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
     }
 
-    private synchronized void execute() {
-        switch (state) {
+    private void execute() {
+        // Single volatile access
+        final State local = state;
+
+        switch (local) {
             case FAILED:
                 // This could happen due to scheduling artifacts
                 break;
@@ -86,7 +104,7 @@ abstract class AbstractTransactionHandler {
                 runningExecute();
                 break;
             default:
-                throw new IllegalStateException("Unhandled state " + state);
+                throw new IllegalStateException("Unhandled state " + local);
         }
     }
 
@@ -94,12 +112,7 @@ abstract class AbstractTransactionHandler {
         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
         if (elapsed >= runtimeNanos) {
             LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
-            if (!checkSuccessful()) {
-                state = State.WAITING;
-                scheduledFuture.cancel(false);
-                scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-            }
-
+            completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
             return;
         }
 
@@ -120,14 +133,36 @@ abstract class AbstractTransactionHandler {
             public void onFailure(final Throwable cause) {
                 txFailure(execFuture, txId, cause);
             }
-        }, executor);
+        }, completingExecutor);
+    }
+
+    private void runtimeUp() {
+        // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
+        completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        if (!checkSuccessful()) {
+            state = State.WAITING;
+            writingFuture.cancel(false);
+        }
+    }
+
+    private boolean checkSuccessful() {
+        if (futures.isEmpty()) {
+            LOG.debug("Completed waiting for all futures");
+            state = State.SUCCESSFUL;
+            completingFuture.cancel(false);
+            runSuccessful(txCounter);
+            return true;
+        }
+
+        return false;
     }
 
     final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
         LOG.debug("Future #{} completed successfully", txId);
         futures.remove(execFuture);
 
-        switch (state) {
+        final State local = state;
+        switch (local) {
             case FAILED:
             case RUNNING:
                 // No-op
@@ -136,7 +171,7 @@ abstract class AbstractTransactionHandler {
                 checkSuccessful();
                 break;
             default:
-                throw new IllegalStateException("Unhandled state " + state);
+                throw new IllegalStateException("Unhandled state " + local);
         }
     }
 
@@ -144,18 +179,19 @@ abstract class AbstractTransactionHandler {
         LOG.debug("Future #{} failed", txId, cause);
         futures.remove(execFuture);
 
-        switch (state) {
+        final State local = state;
+        switch (local) {
             case FAILED:
                 // no-op
                 break;
             case RUNNING:
             case WAITING:
                 state = State.FAILED;
-                scheduledFuture.cancel(false);
+                writingFuture.cancel(false);
                 runFailed(cause);
                 break;
             default:
-                throw new IllegalStateException("Unhandled state " + state);
+                throw new IllegalStateException("Unhandled state " + local);
         }
     }
 
@@ -165,37 +201,29 @@ abstract class AbstractTransactionHandler {
             return;
         }
 
-        int offset = 0;
-        for (ListenableFuture<Void> future : futures) {
-            try {
-                future.get(0, TimeUnit.NANOSECONDS);
-            } catch (final TimeoutException e) {
-                LOG.warn("Future #{}/{} not completed yet", offset, size);
-            } catch (final ExecutionException e) {
-                LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
-            } catch (final InterruptedException e) {
-                LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
+        // Guards iteration against concurrent modification from callbacks
+        synchronized (futures) {
+            int offset = 0;
+
+            for (ListenableFuture<Void> future : futures) {
+                try {
+                    future.get(0, TimeUnit.NANOSECONDS);
+                } catch (final TimeoutException e) {
+                    LOG.warn("Future #{}/{} not completed yet", offset, size);
+                } catch (final ExecutionException e) {
+                    LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
+                } catch (final InterruptedException e) {
+                    LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
+                }
+
+                ++offset;
             }
-
-            ++offset;
         }
 
         state = State.FAILED;
         runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
     }
 
-    private boolean checkSuccessful() {
-        if (futures.isEmpty()) {
-            LOG.debug("Completed waiting for all futures");
-            state = State.SUCCESSFUL;
-            scheduledFuture.cancel(false);
-            runSuccessful(txCounter);
-            return true;
-        }
-
-        return false;
-    }
-
     abstract ListenableFuture<Void> execWrite(final long txId);
 
     abstract void runFailed(Throwable cause);
index 797e252..a8363c1 100644 (file)
@@ -68,9 +68,9 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle
         @Override
         public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
-            LOG.warn("Transaction chain failed.", cause);
-            completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
+            // This is expected to happen frequently in isolation testing.
+            LOG.debug("Transaction chain failed.", cause);
+            // Do not return RPC here, rely on transaction failure to call runFailed.
         }
 
         @Override

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.