BUG-8403: do not throttle purge requests 06/58206/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 25 May 2017 16:10:57 +0000 (18:10 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:47:42 +0000 (09:47 +0200)
It seems we are getting stuck after replay on purge requests,
which are dispatched internally.

Make sure we do not use sendRequest() in obvious replay places,
nor for purge requests. Also add a debug upcall if we happen to
sleep for more than 100msec.

Change-Id: Iec667f2039610f3f036e6b88c7c7e7b773cdfc19
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 20ece8c549211d1c453f1763132bb0a0ca7be0e0)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java

index 6cb89ee..32becc0 100644 (file)
@@ -69,6 +69,9 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @VisibleForTesting
     static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
 
+    // Emit a debug entry if we sleep for more that this amount
+    private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
+
     private final Lock lock = new ReentrantLock();
     private final ClientActorContext context;
     @GuardedBy("lock")
@@ -132,6 +135,9 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         final long now = currentTime();
         final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
         try {
+            if (delay >= DEBUG_DELAY_NANOS && LOG.isDebugEnabled()) {
+                LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay));
+            }
             TimeUnit.NANOSECONDS.sleep(delay);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
index b8057c5..bf56376 100644 (file)
@@ -334,7 +334,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
         sendRequest(abortRequest(), resp -> {
             LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp);
-            sendPurge();
+            enqueuePurge();
         });
     }
 
@@ -352,7 +352,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             // This is a terminal request, hence we do not need to record it
             LOG.debug("Transaction {} abort completed", this);
-            sendPurge();
+            enqueuePurge();
         });
     }
 
@@ -403,7 +403,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
                     // This is a terminal request, hence we do not need to record it
                     LOG.debug("Transaction {} directCommit completed", this);
-                    sendPurge();
+                    enqueuePurge();
                 });
 
                 return ret;
@@ -497,20 +497,27 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
-            sendPurge();
+            enqueuePurge();
         });
     }
 
-    private void sendPurge() {
-        sendPurge(null);
+    private void enqueuePurge() {
+        enqueuePurge(null);
     }
 
-    final void sendPurge(final Consumer<Response<?, ?>> callback) {
-        sendRequest(purgeRequest(), resp -> completePurge(resp, callback));
+    final void enqueuePurge(final Consumer<Response<?, ?>> callback) {
+        // Purge request are dispatched internally, hence should not wait
+        enqueuePurge(callback, parent.currentTime());
     }
 
     final void enqueuePurge(final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
-        enqueueRequest(purgeRequest(), resp -> completePurge(resp, callback), enqueuedTicks);
+        enqueueRequest(purgeRequest(), resp -> {
+            LOG.debug("Transaction {} purge completed", this);
+            parent.completeTransaction(this);
+            if (callback != null) {
+                callback.accept(resp);
+            }
+        }, enqueuedTicks);
     }
 
     private TransactionPurgeRequest purgeRequest() {
@@ -518,14 +525,6 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
     }
 
-    private void completePurge(final Response<?, ?> resp, final Consumer<Response<?, ?>> callback) {
-        LOG.debug("Transaction {} purge completed", this);
-        parent.completeTransaction(this);
-        if (callback != null) {
-            callback.accept(resp);
-        }
-    }
-
     // Called with the connection unlocked
     final synchronized void startReconnect() {
         // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous
index 49aedaf..02f8e5a 100644 (file)
@@ -162,7 +162,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         } else if (handleReadRequest(request, callback)) {
             // No-op
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge(callback);
+            enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
@@ -202,7 +202,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
             successor.abort();
         } else if (request instanceof TransactionPurgeRequest) {
             LOG.debug("Forwarding purge {} to successor {}", request, successor);
-            successor.sendPurge(callback);
+            successor.enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
@@ -214,7 +214,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            successor.sendPurge(callback);
+            successor.enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request" + request);
         }
index 34e8ba3..88a7977 100644 (file)
@@ -223,7 +223,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof CreateLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
                         it.remove();
                         break;
                     }
@@ -246,7 +246,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof DestroyLocalHistoryRequest) {
-                        successor.connection.sendRequest(req, e.getCallback());
+                        successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
                         it.remove();
                         break;
                     }
index 0095ec5..6e54695 100644 (file)
@@ -361,7 +361,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
             ensureFlushedBuider();
             sendDoAbort(callback);
         } else if (request instanceof TransactionPurgeRequest) {
-            sendPurge(callback);
+            enqueuePurge(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.