BUG-5280: make sure we arm the request timer 00/53900/4
authorRobert Varga <rovarga@cisco.com>
Mon, 27 Mar 2017 14:19:25 +0000 (16:19 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Wed, 29 Mar 2017 15:36:47 +0000 (15:36 +0000)
The timer which is supposed to timeout requests and detect
overall badness of the backeend was not being armed. Fix that
by scheduling it whenever we make the queue non-empty.

Change-Id: I9d8be694e3ed5154b66baca76c0788840a38c2f7
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java

index ac4ac78..69deb01 100644 (file)
@@ -44,12 +44,18 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @VisibleForTesting
     static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
 
+    private static final FiniteDuration REQUEST_TIMEOUT_DURATION = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
+        TimeUnit.NANOSECONDS);
+
     private final Lock lock = new ReentrantLock();
     private final ClientActorContext context;
     @GuardedBy("lock")
     private final TransmitQueue queue;
     private final Long cookie;
 
+    @GuardedBy("lock")
+    private boolean haveTimer;
+
     private volatile RequestException poisoned;
 
     // Do not allow subclassing outside of this package
@@ -108,7 +114,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     @GuardedBy("lock")
     final void finishReplay(final ReconnectForwarder forwarder) {
-        queue.setForwarder(forwarder, readTime());
+        setForwarder(forwarder);
         lock.unlock();
     }
 
@@ -127,6 +133,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     final long enqueueEntry(final ConnectionEntry entry, final long now) {
         lock.lock();
         try {
+            if (queue.isEmpty()) {
+                // The queue is becoming non-empty, schedule a timer
+                scheduleTimer(REQUEST_TIMEOUT_DURATION);
+            }
             return queue.enqueue(entry, now);
         } finally {
             lock.unlock();
@@ -138,7 +148,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         try {
             TimeUnit.NANOSECONDS.sleep(delay);
         } catch (InterruptedException e) {
-            LOG.debug("Interrupted while sleeping");
+            LOG.debug("Interrupted while sleeping", e);
         }
     }
 
@@ -147,9 +157,19 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      *
      * @param delay Delay, in nanoseconds
      */
+    @GuardedBy("lock")
     private void scheduleTimer(final FiniteDuration delay) {
+        if (haveTimer) {
+            LOG.debug("{}: timer already scheduled", context.persistenceId());
+            return;
+        }
+        if (queue.hasSuccessor()) {
+            LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId());
+            return;
+        }
         LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay);
         context.executeInActor(this::runTimer, delay);
+        haveTimer = true;
     }
 
     /**
@@ -165,6 +185,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
         lock.lock();
         try {
+            haveTimer = false;
             final long now = readTime();
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             final long ticksSinceProgress = queue.ticksStalling(now);
@@ -185,15 +206,15 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
                 // We have timed out. There is no point in scheduling a timer
                 return reconnectConnection(current);
             }
+
+            if (delay.isPresent()) {
+                // If there is new delay, schedule a timer
+                scheduleTimer(delay.get());
+            }
         } finally {
             lock.unlock();
         }
 
-        if (delay.isPresent()) {
-            // If there is new delay, schedule a timer
-            scheduleTimer(delay.get());
-        }
-
         return current;
     }
 
index 4a1b3a2..d384ba4 100644 (file)
@@ -112,6 +112,10 @@ abstract class TransmitQueue {
         return tracker.ticksStalling(now);
     }
 
+    final boolean hasSuccessor() {
+        return successor != null;
+    }
+
     // If a matching request was found, this will track a task was closed.
     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);