BUG-8403: do not throttle purge requests 06/58206/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 25 May 2017 16:10:57 +0000 (18:10 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:47:42 +0000 (09:47 +0200)
It seems we are getting stuck after replay on purge requests,
which are dispatched internally.

Make sure we do not use sendRequest() in obvious replay places,
nor for purge requests. Also add a debug upcall if we happen to
sleep for more than 100msec.

Change-Id: Iec667f2039610f3f036e6b88c7c7e7b773cdfc19
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 20ece8c549211d1c453f1763132bb0a0ca7be0e0)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java

index 6cb89eec1601e5fa901f89e7ac37d7e254698d66..32becc040d4b594cd5657889e9268ec8469597cf 100644 (file)
@@ -69,6 +69,9 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @VisibleForTesting
     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
 
     @VisibleForTesting
     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
 
+    // Emit a debug entry if we sleep for more that this amount
+    private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
+
     private final Lock lock = new ReentrantLock();
     private final ClientActorContext context;
     @GuardedBy("lock")
     private final Lock lock = new ReentrantLock();
     private final ClientActorContext context;
     @GuardedBy("lock")
@@ -132,6 +135,9 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         final long now = currentTime();
         final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
         try {
         final long now = currentTime();
         final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
         try {
+            if (delay >= DEBUG_DELAY_NANOS && LOG.isDebugEnabled()) {
+                LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay));
+            }
             TimeUnit.NANOSECONDS.sleep(delay);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             TimeUnit.NANOSECONDS.sleep(delay);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
index b8057c5afe428fd162e0ddf02c6a2c6d11eab108..bf56376fcee3d538d483a063d652ae6d3d5a5654 100644 (file)
@@ -334,7 +334,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
         sendRequest(abortRequest(), resp -> {
             LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
 
         sendRequest(abortRequest(), resp -> {
             LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
-            sendPurge();
+            enqueuePurge();
         });
     }
 
         });
     }
 
@@ -352,7 +352,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             // This is a terminal request, hence we do not need to record it
             LOG.debug("Transaction {} abort completed", this);
 
             // This is a terminal request, hence we do not need to record it
             LOG.debug("Transaction {} abort completed", this);
-            sendPurge();
+            enqueuePurge();
         });
     }
 
         });
     }
 
@@ -403,7 +403,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
                     // This is a terminal request, hence we do not need to record it
                     LOG.debug("Transaction {} directCommit completed", this);
 
                     // This is a terminal request, hence we do not need to record it
                     LOG.debug("Transaction {} directCommit completed", this);
-                    sendPurge();
+                    enqueuePurge();
                 });
 
                 return ret;
                 });
 
                 return ret;
@@ -497,20 +497,27 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
-            sendPurge();
+            enqueuePurge();
         });
     }
 
         });
     }
 
-    private void sendPurge() {
-        sendPurge(null);
+    private void enqueuePurge() {
+        enqueuePurge(null);
     }
 
     }
 
-    final void sendPurge(final Consumer<Response<?, ?>> callback) {
-        sendRequest(purgeRequest(), resp -> completePurge(resp, callback));
+    final void enqueuePurge(final Consumer<Response<?, ?>> callback) {
+        // Purge request are dispatched internally, hence should not wait
+        enqueuePurge(callback, parent.currentTime());
     }
 
     final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
     }
 
     final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
-        enqueueRequest(purgeRequest(), resp -> completePurge(resp, callback), enqueuedTicks);
+        enqueueRequest(purgeRequest(), resp -> {
+            LOG.debug("Transaction {} purge completed", this);
+            parent.completeTransaction(this);
+            if (callback != null) {
+                callback.accept(resp);
+            }
+        }, enqueuedTicks);
     }
 
     private TransactionPurgeRequest purgeRequest() {
     }
 
     private TransactionPurgeRequest purgeRequest() {
@@ -518,14 +525,6 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
     }
 
         return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
     }
 
-    private void completePurge(final Response<?, ?> resp, final Consumer<Response<?, ?>> callback) {
-        LOG.debug("Transaction {} purge completed", this);
-        parent.completeTransaction(this);
-        if (callback != null) {
-            callback.accept(resp);
-        }
-    }
-
     // Called with the connection unlocked
     final synchronized void startReconnect() {
         // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
     // Called with the connection unlocked
     final synchronized void startReconnect() {
         // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
index 49aedaf83d00f50223966f1cc941a69876072834..02f8e5a9532abd1a8ad45c2824c205694177a762 100644 (file)
@@ -162,7 +162,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         } else if (handleReadRequest(request, callback)) {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
         } else if (handleReadRequest(request, callback)) {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge(callback);
+            enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
@@ -202,7 +202,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             successor.abort();
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
             successor.abort();
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
-            successor.sendPurge(callback);
+            successor.enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -214,7 +214,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof TransactionPurgeRequest) {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            successor.sendPurge(callback);
+            successor.enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
index 34e8ba37a379669de09013099ad83a8e9ceaa38c..88a79775baa94730eba45a01ed8e229c0b701c69 100644 (file)
@@ -223,7 +223,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof CreateLocalHistoryRequest) {
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof CreateLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
                         it.remove();
                         break;
                     }
                         it.remove();
                         break;
                     }
@@ -246,7 +246,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof DestroyLocalHistoryRequest) {
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof DestroyLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
                         it.remove();
                         break;
                     }
                         it.remove();
                         break;
                     }
index 0095ec58a37512c66aefd250652980a9b4dce0fb..6e54695532a19f5eff87f66fc14662d683120c5b 100644 (file)
@@ -361,7 +361,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider();
             sendDoAbort(callback);
         } else if (request instanceof TransactionPurgeRequest) {
             ensureFlushedBuider();
             sendDoAbort(callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge(callback);
+            enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }