Add JournalSegmentFile.map()
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
index 0313a72a8319fc107a967a2bfbb7c188b4a08ac4..cc3da1e4503118ed5a064ceeb60fd34c4fe9e12d 100644 (file)
@@ -7,21 +7,22 @@
  */
 package org.opendaylight.controller.cluster.access.client;
 
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
-import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
@@ -52,11 +53,8 @@ import org.slf4j.LoggerFactory;
  *
  * <p>
  * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
- *
- * @author Robert Varga
  */
-@NotThreadSafe
-abstract class TransmitQueue {
+abstract sealed class TransmitQueue {
     static final class Halted extends TransmitQueue {
         // For ConnectingClientConnection.
         Halted(final int targetDepth) {
@@ -79,7 +77,7 @@ abstract class TransmitQueue {
         }
 
         @Override
-        void preComplete(ResponseEnvelope<?> envelope) {
+        void preComplete(final ResponseEnvelope<?> envelope) {
         }
     }
 
@@ -95,8 +93,8 @@ abstract class TransmitQueue {
         Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
                 final MessageSlicer messageSlicer) {
             super(oldQueue, targetDepth, now);
-            this.backend = Preconditions.checkNotNull(backend);
-            this.messageSlicer = Preconditions.checkNotNull(messageSlicer);
+            this.backend = requireNonNull(backend);
+            this.messageSlicer = requireNonNull(messageSlicer);
         }
 
         @Override
@@ -135,7 +133,7 @@ abstract class TransmitQueue {
         }
 
         @Override
-        void preComplete(ResponseEnvelope<?> envelope) {
+        void preComplete(final ResponseEnvelope<?> envelope) {
             if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
                 // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
                 // requests to be transmitted.
@@ -148,7 +146,8 @@ abstract class TransmitQueue {
 
     private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
     private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
-    private final AveragingProgressTracker tracker;  // Cannot be just ProgressTracker as we are inheriting limits.
+    // Cannot be just ProgressTracker as we are inheriting limits.
+    private final AveragingProgressTracker tracker;
     private ReconnectForwarder successor;
 
     /**
@@ -218,7 +217,7 @@ abstract class TransmitQueue {
             return Optional.empty();
         }
 
-        final TransmittedConnectionEntry entry = maybeEntry.get();
+        final TransmittedConnectionEntry entry = maybeEntry.orElseThrow();
         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
 
         // We have freed up a slot, try to transmit something
@@ -256,7 +255,7 @@ abstract class TransmitQueue {
             return false;
         }
 
-        inflight.addLast(maybeTransmitted.get());
+        inflight.addLast(maybeTransmitted.orElseThrow());
         return true;
     }
 
@@ -338,14 +337,18 @@ abstract class TransmitQueue {
         return pending.peek();
     }
 
-    final void poison(final RequestException cause) {
-        poisonQueue(inflight, cause);
-        poisonQueue(pending, cause);
+    final List<ConnectionEntry> poison() {
+        final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
+        entries.addAll(inflight);
+        inflight.clear();
+        entries.addAll(pending);
+        pending.clear();
+        return entries;
     }
 
     final void setForwarder(final ReconnectForwarder forwarder, final long now) {
-        Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
-        successor = Preconditions.checkNotNull(forwarder);
+        verify(successor == null, "Successor %s already set on connection %s", successor, this);
+        successor = requireNonNull(forwarder);
         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
 
         /*
@@ -421,12 +424,10 @@ abstract class TransmitQueue {
             }
 
             // Check if the entry has (ever) been transmitted
-            if (!(e instanceof TransmittedConnectionEntry)) {
+            if (!(e instanceof TransmittedConnectionEntry te)) {
                 return Optional.empty();
             }
 
-            final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
-
             // Now check session match
             if (envelope.getSessionId() != te.getSessionId()) {
                 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
@@ -444,13 +445,4 @@ abstract class TransmitQueue {
 
         return null;
     }
-
-    private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
-        for (ConnectionEntry e : queue) {
-            final Request<?, ?> request = e.getRequest();
-            LOG.trace("Poisoning request {}", request, cause);
-            e.complete(request.toRequestFailure(cause));
-        }
-        queue.clear();
-    }
 }