Bug 8494: Separate writing and completion threads 03/60703/1
authorVratko Polak <vrpolak@cisco.com>
Fri, 21 Jul 2017 10:24:49 +0000 (12:24 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 24 Jul 2017 21:38:21 +0000 (21:38 +0000)
If AbstractTransactionHandler uses only one executor thread,
future completion callbacks are delayed by throttling on writes.
CSIT aims to detect RequestTimeoutException within a narrow window,
so a separate executor for callbacks is used now.

The delay would not be that critical, but the problem is the timing
between a scheduled execution which exceeds scheduling gaps. These
seem to hold up normally-submitted tasks, leading to futures never
completing.

Therefore we use two Executors and synchronize state modification
call sites. Hence the two tasks (throttled producer) and future
completions can run concurrently (aside from state synchronization).

Change-Id: I642c5295ab6188b2d7e1b5feae62ab7ef52d41eb
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 8744119235b90d89021567e5f12361d98b823b8f)

opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/WriteTransactionsHandler.java

index d0923ce6de7a5b4486f2292231c3fcc0f93dc471..d3b0a7b0497fbbc204edb6900738cc15fa0005fa 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collection;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.HashSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -56,15 +57,27 @@ abstract class AbstractTransactionHandler {
 
     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
 
 
     private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
 
-    private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread();
-    private final Collection<ListenableFuture<Void>> futures = new HashSet<>();
+    /*
+     * writingExecutor is a single thread executor. Only this thread will write to datastore,
+     * incurring sleep penalties if backend is not responsive. This thread never changes, but reads State.
+     * This thread only adds to futures set.
+     */
+    private final ScheduledExecutorService writingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+    /*
+     * completingExecutor is a single thread executor. Only this thread writes to State.
+     * This thread should never incur any sleep penalty, so RPC response should always come on time.
+     * This thread only removes from futures set.
+     */
+    private final ScheduledExecutorService completingExecutor = FinalizableScheduledExecutorService.newSingleThread();
+    private final Collection<ListenableFuture<Void>> futures = Collections.synchronizedSet(new HashSet<>());
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
     private final long runtimeNanos;
     private final long delayNanos;
 
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
     private final long runtimeNanos;
     private final long delayNanos;
 
-    private ScheduledFuture<?> scheduledFuture;
+    private ScheduledFuture<?> writingFuture;
+    private ScheduledFuture<?> completingFuture;
     private long txCounter;
     private long txCounter;
-    private State state;
+    private volatile State state;
 
     AbstractTransactionHandler(final TransactionsParams params) {
         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
 
     AbstractTransactionHandler(final TransactionsParams params) {
         runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
@@ -72,13 +85,18 @@ abstract class AbstractTransactionHandler {
     }
 
     final synchronized void doStart() {
     }
 
     final synchronized void doStart() {
-        scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
+        // Setup state first...
         stopwatch.start();
         state = State.RUNNING;
         stopwatch.start();
         state = State.RUNNING;
+
+        writingFuture = writingExecutor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
     }
 
     }
 
-    private synchronized void execute() {
-        switch (state) {
+    private void execute() {
+        // Single volatile access
+        final State local = state;
+
+        switch (local) {
             case FAILED:
                 // This could happen due to scheduling artifacts
                 break;
             case FAILED:
                 // This could happen due to scheduling artifacts
                 break;
@@ -86,7 +104,7 @@ abstract class AbstractTransactionHandler {
                 runningExecute();
                 break;
             default:
                 runningExecute();
                 break;
             default:
-                throw new IllegalStateException("Unhandled state " + state);
+                throw new IllegalStateException("Unhandled state " + local);
         }
     }
 
         }
     }
 
@@ -94,12 +112,7 @@ abstract class AbstractTransactionHandler {
         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
         if (elapsed >= runtimeNanos) {
             LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
         final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
         if (elapsed >= runtimeNanos) {
             LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
-            if (!checkSuccessful()) {
-                state = State.WAITING;
-                scheduledFuture.cancel(false);
-                scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-            }
-
+            completingExecutor.schedule(this::runtimeUp, 0, TimeUnit.SECONDS);
             return;
         }
 
             return;
         }
 
@@ -120,14 +133,36 @@ abstract class AbstractTransactionHandler {
             public void onFailure(final Throwable cause) {
                 txFailure(execFuture, txId, cause);
             }
             public void onFailure(final Throwable cause) {
                 txFailure(execFuture, txId, cause);
             }
-        }, executor);
+        }, completingExecutor);
+    }
+
+    private void runtimeUp() {
+        // checkSuccessful has two call sites, it is simpler to create completingFuture unconditionally.
+        completingFuture = completingExecutor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        if (!checkSuccessful()) {
+            state = State.WAITING;
+            writingFuture.cancel(false);
+        }
+    }
+
+    private boolean checkSuccessful() {
+        if (futures.isEmpty()) {
+            LOG.debug("Completed waiting for all futures");
+            state = State.SUCCESSFUL;
+            completingFuture.cancel(false);
+            runSuccessful(txCounter);
+            return true;
+        }
+
+        return false;
     }
 
     final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
         LOG.debug("Future #{} completed successfully", txId);
         futures.remove(execFuture);
 
     }
 
     final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
         LOG.debug("Future #{} completed successfully", txId);
         futures.remove(execFuture);
 
-        switch (state) {
+        final State local = state;
+        switch (local) {
             case FAILED:
             case RUNNING:
                 // No-op
             case FAILED:
             case RUNNING:
                 // No-op
@@ -136,7 +171,7 @@ abstract class AbstractTransactionHandler {
                 checkSuccessful();
                 break;
             default:
                 checkSuccessful();
                 break;
             default:
-                throw new IllegalStateException("Unhandled state " + state);
+                throw new IllegalStateException("Unhandled state " + local);
         }
     }
 
         }
     }
 
@@ -144,18 +179,19 @@ abstract class AbstractTransactionHandler {
         LOG.debug("Future #{} failed", txId, cause);
         futures.remove(execFuture);
 
         LOG.debug("Future #{} failed", txId, cause);
         futures.remove(execFuture);
 
-        switch (state) {
+        final State local = state;
+        switch (local) {
             case FAILED:
                 // no-op
                 break;
             case RUNNING:
             case WAITING:
                 state = State.FAILED;
             case FAILED:
                 // no-op
                 break;
             case RUNNING:
             case WAITING:
                 state = State.FAILED;
-                scheduledFuture.cancel(false);
+                writingFuture.cancel(false);
                 runFailed(cause);
                 break;
             default:
                 runFailed(cause);
                 break;
             default:
-                throw new IllegalStateException("Unhandled state " + state);
+                throw new IllegalStateException("Unhandled state " + local);
         }
     }
 
         }
     }
 
@@ -165,37 +201,29 @@ abstract class AbstractTransactionHandler {
             return;
         }
 
             return;
         }
 
-        int offset = 0;
-        for (ListenableFuture<Void> future : futures) {
-            try {
-                future.get(0, TimeUnit.NANOSECONDS);
-            } catch (final TimeoutException e) {
-                LOG.warn("Future #{}/{} not completed yet", offset, size);
-            } catch (final ExecutionException e) {
-                LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
-            } catch (final InterruptedException e) {
-                LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
+        // Guards iteration against concurrent modification from callbacks
+        synchronized (futures) {
+            int offset = 0;
+
+            for (ListenableFuture<Void> future : futures) {
+                try {
+                    future.get(0, TimeUnit.NANOSECONDS);
+                } catch (final TimeoutException e) {
+                    LOG.warn("Future #{}/{} not completed yet", offset, size);
+                } catch (final ExecutionException e) {
+                    LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
+                } catch (final InterruptedException e) {
+                    LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
+                }
+
+                ++offset;
             }
             }
-
-            ++offset;
         }
 
         state = State.FAILED;
         runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
     }
 
         }
 
         state = State.FAILED;
         runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
     }
 
-    private boolean checkSuccessful() {
-        if (futures.isEmpty()) {
-            LOG.debug("Completed waiting for all futures");
-            state = State.SUCCESSFUL;
-            scheduledFuture.cancel(false);
-            runSuccessful(txCounter);
-            return true;
-        }
-
-        return false;
-    }
-
     abstract ListenableFuture<Void> execWrite(final long txId);
 
     abstract void runFailed(Throwable cause);
     abstract ListenableFuture<Void> execWrite(final long txId);
 
     abstract void runFailed(Throwable cause);
index 797e252a914a6cdcd0a3811e1dcae9a4c12a2c6b..a8363c1d2f27880c49bcae2cc1c0d01c032aaeee 100644 (file)
@@ -68,9 +68,9 @@ public abstract class WriteTransactionsHandler extends AbstractTransactionHandle
         @Override
         public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
         @Override
         public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
-            LOG.warn("Transaction chain failed.", cause);
-            completionFuture.set(RpcResultBuilder.<WriteTransactionsOutput>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", cause).build());
+            // This is expected to happen frequently in isolation testing.
+            LOG.debug("Transaction chain failed.", cause);
+            // Do not return RPC here, rely on transaction failure to call runFailed.
         }
 
         @Override
         }
 
         @Override