BUG-8422: Propagate enqueue time
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / AbstractClientConnection.java
index 2423473472301b8b50a77b4758cc3b7eee138f03..d37893bd7c3a9862ad89fb9d88b94337d0355f13 100644 (file)
@@ -87,6 +87,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         return context.self();
     }
 
+    public final long currentTime() {
+        return context.ticker().read();
+    }
+
     /**
      * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
      * from any thread.
@@ -98,13 +102,31 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      * @param callback Callback to invoke
      */
     public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
-        final RequestException maybePoison = poisoned;
-        if (maybePoison != null) {
-            throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+        final long now = currentTime();
+        final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
+        try {
+            TimeUnit.NANOSECONDS.sleep(delay);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
         }
+    }
 
-        final ConnectionEntry entry = new ConnectionEntry(request, callback, readTime());
-        enqueueAndWait(entry, entry.getEnqueuedTicks());
+    /**
+     * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
+     * from any thread.
+     *
+     * <p>
+     * Note that unlike {@link #sendRequest(Request, Consumer)}, this method does not exert backpressure, hence it
+     * should never be called from an application thread.
+     *
+     * @param request Request to send
+     * @param callback Callback to invoke
+     * @param enqueuedTicks Time (according to {@link #currentTime()} of request enqueue
+     */
+    public final void enqueueRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
     }
 
     public abstract Optional<T> getBackendInfo();
@@ -122,19 +144,20 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     @GuardedBy("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
-        queue.setForwarder(forwarder, readTime());
+        queue.setForwarder(forwarder, currentTime());
     }
 
     @GuardedBy("lock")
     abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
 
-    private long readTime() {
-        return context.ticker().read();
-    }
-
     final long enqueueEntry(final ConnectionEntry entry, final long now) {
         lock.lock();
         try {
+            final RequestException maybePoison = poisoned;
+            if (maybePoison != null) {
+                throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+            }
+
             if (queue.isEmpty()) {
                 // The queue is becoming non-empty, schedule a timer
                 scheduleTimer(REQUEST_TIMEOUT_DURATION);
@@ -145,15 +168,6 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
     }
 
-    final void enqueueAndWait(final ConnectionEntry entry, final long now) {
-        final long delay = enqueueEntry(entry, now);
-        try {
-            TimeUnit.NANOSECONDS.sleep(delay);
-        } catch (InterruptedException e) {
-            LOG.debug("Interrupted while sleeping", e);
-        }
-    }
-
     final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
         lock.lock();
         try {
@@ -197,7 +211,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         lock.lock();
         try {
             haveTimer = false;
-            final long now = readTime();
+            final long now = currentTime();
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             final long ticksSinceProgress = queue.ticksStalling(now);
             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
@@ -284,7 +298,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     final void receiveResponse(final ResponseEnvelope<?> envelope) {
-        final long now = readTime();
+        final long now = currentTime();
 
         final Optional<TransmittedConnectionEntry> maybeEntry;
         lock.lock();