From: Robert Varga Date: Thu, 25 May 2017 16:10:57 +0000 (+0200) Subject: BUG-8403: do not throttle purge requests X-Git-Tag: release/carbon-sr1~57 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=20ece8c549211d1c453f1763132bb0a0ca7be0e0 BUG-8403: do not throttle purge requests It seems we are getting stuck after replay on purge requests, which are dispatched internally. Make sure we do not use sendRequest() in obvious replay places, nor for purge requests. Also add a debug upcall if we happen to sleep for more than 100msec. Change-Id: Iec667f2039610f3f036e6b88c7c7e7b773cdfc19 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 6cb89eec16..32becc040d 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -69,6 +69,9 @@ public abstract class AbstractClientConnection { @VisibleForTesting static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15); + // Emit a debug entry if we sleep for more that this amount + private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100); + private final Lock lock = new ReentrantLock(); private final ClientActorContext context; @GuardedBy("lock") @@ -132,6 +135,9 @@ public abstract class AbstractClientConnection { final long now = currentTime(); final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now); try { + if (delay >= DEBUG_DELAY_NANOS && LOG.isDebugEnabled()) { + LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay)); + } TimeUnit.NANOSECONDS.sleep(delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index b8057c5afe..bf56376fce 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -334,7 +334,7 @@ abstract class AbstractProxyTransaction implements Identifiable { LOG.debug("Transaction {} abort completed with {}", getIdentifier(), resp); - sendPurge(); + enqueuePurge(); }); } @@ -352,7 +352,7 @@ abstract class AbstractProxyTransaction implements Identifiable> callback) { - sendRequest(purgeRequest(), resp -> completePurge(resp, callback)); + final void enqueuePurge(final Consumer> callback) { + // Purge request are dispatched internally, hence should not wait + enqueuePurge(callback, parent.currentTime()); } final void enqueuePurge(final Consumer> callback, final long enqueuedTicks) { - enqueueRequest(purgeRequest(), resp -> completePurge(resp, callback), enqueuedTicks); + enqueueRequest(purgeRequest(), resp -> { + LOG.debug("Transaction {} purge completed", this); + parent.completeTransaction(this); + if (callback != null) { + callback.accept(resp); + } + }, enqueuedTicks); } private TransactionPurgeRequest purgeRequest() { @@ -518,14 +525,6 @@ abstract class AbstractProxyTransaction implements Identifiable resp, final Consumer> callback) { - LOG.debug("Transaction {} purge completed", this); - parent.completeTransaction(this); - if (callback != null) { - callback.accept(resp); - } - } - // Called with the connection unlocked final synchronized void startReconnect() { // At this point canCommit/directCommit are blocked, we assert a new successor state, retrieving the previous diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 49aedaf83d..02f8e5a953 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -162,7 +162,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } else if (handleReadRequest(request, callback)) { // No-op } else if (request instanceof TransactionPurgeRequest) { - sendPurge(callback); + enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } @@ -202,7 +202,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { successor.abort(); } else if (request instanceof TransactionPurgeRequest) { LOG.debug("Forwarding purge {} to successor {}", request, successor); - successor.sendPurge(callback); + successor.enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request" + request); } @@ -214,7 +214,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); } else if (request instanceof TransactionPurgeRequest) { - successor.sendPurge(callback); + successor.enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request" + request); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 34e8ba37a3..88a79775ba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -223,7 +223,7 @@ abstract class ProxyHistory implements Identifiable { if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); if (req instanceof CreateLocalHistoryRequest) { - successor.connection.sendRequest(req, e.getCallback()); + successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); break; } @@ -246,7 +246,7 @@ abstract class ProxyHistory implements Identifiable { if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); if (req instanceof DestroyLocalHistoryRequest) { - successor.connection.sendRequest(req, e.getCallback()); + successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); break; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 0095ec58a3..6e54695532 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -361,7 +361,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { ensureFlushedBuider(); sendDoAbort(callback); } else if (request instanceof TransactionPurgeRequest) { - sendPurge(callback); + enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request {}" + request); }