X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmitQueue.java;h=71de580bd3f291d200170078db9ab939187d6da9;hb=HEAD;hp=0313a72a8319fc107a967a2bfbb7c188b4a08ac4;hpb=d7c9a8ccfcb57f005490a226803d094289997ef9;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index 0313a72a83..cc3da1e450 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -7,21 +7,22 @@ */ package org.opendaylight.controller.cluster.access.client; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.Deque; import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.Queue; -import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; -import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; @@ -52,11 +53,8 @@ import org.slf4j.LoggerFactory; * *

* This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}. - * - * @author Robert Varga */ -@NotThreadSafe -abstract class TransmitQueue { +abstract sealed class TransmitQueue { static final class Halted extends TransmitQueue { // For ConnectingClientConnection. Halted(final int targetDepth) { @@ -79,7 +77,7 @@ abstract class TransmitQueue { } @Override - void preComplete(ResponseEnvelope envelope) { + void preComplete(final ResponseEnvelope envelope) { } } @@ -95,8 +93,8 @@ abstract class TransmitQueue { Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now, final MessageSlicer messageSlicer) { super(oldQueue, targetDepth, now); - this.backend = Preconditions.checkNotNull(backend); - this.messageSlicer = Preconditions.checkNotNull(messageSlicer); + this.backend = requireNonNull(backend); + this.messageSlicer = requireNonNull(messageSlicer); } @Override @@ -135,7 +133,7 @@ abstract class TransmitQueue { } @Override - void preComplete(ResponseEnvelope envelope) { + void preComplete(final ResponseEnvelope envelope) { if (envelope.getTxSequence() == currentSlicedEnvSequenceId) { // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent // requests to be transmitted. @@ -148,7 +146,8 @@ abstract class TransmitQueue { private final Deque inflight = new ArrayDeque<>(); private final Deque pending = new ArrayDeque<>(); - private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits. + // Cannot be just ProgressTracker as we are inheriting limits. + private final AveragingProgressTracker tracker; private ReconnectForwarder successor; /** @@ -218,7 +217,7 @@ abstract class TransmitQueue { return Optional.empty(); } - final TransmittedConnectionEntry entry = maybeEntry.get(); + final TransmittedConnectionEntry entry = maybeEntry.orElseThrow(); tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); // We have freed up a slot, try to transmit something @@ -256,7 +255,7 @@ abstract class TransmitQueue { return false; } - inflight.addLast(maybeTransmitted.get()); + inflight.addLast(maybeTransmitted.orElseThrow()); return true; } @@ -338,14 +337,18 @@ abstract class TransmitQueue { return pending.peek(); } - final void poison(final RequestException cause) { - poisonQueue(inflight, cause); - poisonQueue(pending, cause); + final List poison() { + final List entries = new ArrayList<>(inflight.size() + pending.size()); + entries.addAll(inflight); + inflight.clear(); + entries.addAll(pending); + pending.clear(); + return entries; } final void setForwarder(final ReconnectForwarder forwarder, final long now) { - Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this); - successor = Preconditions.checkNotNull(forwarder); + verify(successor == null, "Successor %s already set on connection %s", successor, this); + successor = requireNonNull(forwarder); LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); /* @@ -421,12 +424,10 @@ abstract class TransmitQueue { } // Check if the entry has (ever) been transmitted - if (!(e instanceof TransmittedConnectionEntry)) { + if (!(e instanceof TransmittedConnectionEntry te)) { return Optional.empty(); } - final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e; - // Now check session match if (envelope.getSessionId() != te.getSessionId()) { LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope); @@ -444,13 +445,4 @@ abstract class TransmitQueue { return null; } - - private static void poisonQueue(final Queue queue, final RequestException cause) { - for (ConnectionEntry e : queue) { - final Request request = e.getRequest(); - LOG.trace("Poisoning request {}", request, cause); - e.complete(request.toRequestFailure(cause)); - } - queue.clear(); - } }