Use OptionalLong in AbstractClientConnection
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / AbstractClientConnection.java
index 03d4691cb44fe14eb05c33790632859c17d14b8f..9c290df13da3f61da22f0f728eaa3b5a150f255f 100644 (file)
@@ -15,14 +15,16 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.NotThreadSafe;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -36,11 +38,10 @@ import scala.concurrent.duration.FiniteDuration;
 /**
  * Base class for a connection to the backend. Responsible to queueing and dispatch of requests toward the backend.
  * Can be in three conceptual states: Connecting, Connected and Reconnecting, which are represented by public final
- * classes exposed from this package.
+ * classes exposed from this package. This class NOT thread-safe, not are its subclasses expected to be thread-safe.
  *
  * @author Robert Varga
  */
-@NotThreadSafe
 public abstract class AbstractClientConnection<T extends BackendInfo> {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class);
 
@@ -198,7 +199,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
     }
 
-    @GuardedBy("lock")
+    @Holding("lock")
     private void commonEnqueue(final ConnectionEntry entry, final long now) {
         final RequestException maybePoison = poisoned;
         if (maybePoison != null) {
@@ -223,7 +224,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         return queue.drain();
     }
 
-    @GuardedBy("lock")
+    @Holding("lock")
     final void finishReplay(final ReconnectForwarder forwarder) {
         setForwarder(forwarder);
 
@@ -243,12 +244,12 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         lock.unlock();
     }
 
-    @GuardedBy("lock")
+    @Holding("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
         queue.setForwarder(forwarder, currentTime());
     }
 
-    @GuardedBy("lock")
+    @Holding("lock")
     abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current,
             RequestException runtimeRequestException);
 
@@ -287,7 +288,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      *
      * @param delay Delay, in nanoseconds
      */
-    @GuardedBy("lock")
+    @Holding("lock")
     private void scheduleTimer(final long delay) {
         if (haveTimer) {
             LOG.debug("{}: timer already scheduled on {}", context.persistenceId(), this);
@@ -317,9 +318,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     @VisibleForTesting
     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
-        final Optional<Long> delay;
-
         lock.lock();
+
+        final List<ConnectionEntry> poisonEntries;
+        final NoProgressException poisonCause;
         try {
             haveTimer = false;
             final long now = currentTime();
@@ -329,41 +331,43 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
             final long ticksSinceProgress = queue.ticksStalling(now);
-            if (ticksSinceProgress >= context.config().getNoProgressTimeout()) {
-                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
-                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+            if (ticksSinceProgress < context.config().getNoProgressTimeout()) {
+                // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
+                // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual
+                // tri-state return convention.
+                final OptionalLong delay = lockedCheckTimeout(now);
+                if (delay == null) {
+                    // We have timed out. There is no point in scheduling a timer
+                    LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
+                    return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
+                        new TimeoutException()));
+                }
 
-                lockedPoison(new NoProgressException(ticksSinceProgress));
-                current.removeConnection(this);
-                return current;
-            }
+                if (delay.isPresent()) {
+                    // If there is new delay, schedule a timer
+                    scheduleTimer(delay.getAsLong());
+                } else {
+                    LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
+                }
 
-            // Requests are always scheduled in sequence, hence checking for timeout is relatively straightforward.
-            // Note we use also inquire about the delay, so we can re-schedule if needed, hence the unusual tri-state
-            // return convention.
-            delay = lockedCheckTimeout(now);
-            if (delay == null) {
-                // We have timed out. There is no point in scheduling a timer
-                LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
-                return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
-                    new TimeoutException()));
+                return current;
             }
 
-            if (delay.isPresent()) {
-                // If there is new delay, schedule a timer
-                scheduleTimer(delay.get());
-            } else {
-                LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
-            }
+            LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+                TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+            poisonCause = new NoProgressException(ticksSinceProgress);
+            poisonEntries = lockedPoison(poisonCause);
+            current.removeConnection(this);
         } finally {
             lock.unlock();
         }
 
+        poison(poisonEntries, poisonCause);
         return current;
     }
 
     @VisibleForTesting
-    final Optional<Long> checkTimeout(final long now) {
+    final OptionalLong checkTimeout(final long now) {
         lock.lock();
         try {
             return lockedCheckTimeout(now);
@@ -385,10 +389,10 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
             justification = "Returning null Optional is documented in the API contract.")
     @GuardedBy("lock")
-    private Optional<Long> lockedCheckTimeout(final long now) {
+    private OptionalLong lockedCheckTimeout(final long now) {
         if (queue.isEmpty()) {
             LOG.debug("{}: connection {} is empty", context.persistenceId(), this);
-            return Optional.empty();
+            return OptionalLong.empty();
         }
 
         final long backendSilentTicks = backendSilentTicks(now);
@@ -403,7 +407,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             final long beenOpen = now - head.getEnqueuedTicks();
             final long requestTimeout = context.config().getRequestTimeout();
             if (beenOpen < requestTimeout) {
-                return Optional.of(requestTimeout - beenOpen);
+                return OptionalLong.of(requestTimeout - beenOpen);
             }
 
             tasksTimedOut++;
@@ -418,7 +422,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             queue.tryTransmit(now);
         }
 
-        return Optional.empty();
+        return OptionalLong.empty();
     }
 
     private void timeoutEntry(final ConnectionEntry entry, final long beenOpen) {
@@ -436,18 +440,31 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     final void poison(final RequestException cause) {
+        final List<ConnectionEntry> entries;
+
         lock.lock();
         try {
-            lockedPoison(cause);
+            entries = lockedPoison(cause);
         } finally {
             lock.unlock();
         }
+
+        poison(entries, cause);
     }
 
-    @GuardedBy("lock")
-    private void lockedPoison(final RequestException cause) {
+    // Do not hold any locks while calling this
+    private static void poison(final Collection<? extends ConnectionEntry> entries, final RequestException cause) {
+        for (ConnectionEntry e : entries) {
+            final Request<?, ?> request = e.getRequest();
+            LOG.trace("Poisoning request {}", request, cause);
+            e.complete(request.toRequestFailure(cause));
+        }
+    }
+
+    @Holding("lock")
+    private List<ConnectionEntry> lockedPoison(final RequestException cause) {
         poisoned = enrichPoison(cause);
-        queue.poison(cause);
+        return queue.poison();
     }
 
     RequestException enrichPoison(final RequestException ex) {