*/
package org.opendaylight.controller.cluster.access.client;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
import java.util.Queue;
-import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
*
* <p>
* This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
- *
- * @author Robert Varga
*/
-@NotThreadSafe
-abstract class TransmitQueue {
+abstract sealed class TransmitQueue {
static final class Halted extends TransmitQueue {
// For ConnectingClientConnection.
Halted(final int targetDepth) {
}
@Override
- void preComplete(ResponseEnvelope<?> envelope) {
+ void preComplete(final ResponseEnvelope<?> envelope) {
}
}
Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
final MessageSlicer messageSlicer) {
super(oldQueue, targetDepth, now);
- this.backend = Preconditions.checkNotNull(backend);
- this.messageSlicer = Preconditions.checkNotNull(messageSlicer);
+ this.backend = requireNonNull(backend);
+ this.messageSlicer = requireNonNull(messageSlicer);
}
@Override
}
@Override
- void preComplete(ResponseEnvelope<?> envelope) {
+ void preComplete(final ResponseEnvelope<?> envelope) {
if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
// Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
// requests to be transmitted.
private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
- private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits.
+ // Cannot be just ProgressTracker as we are inheriting limits.
+ private final AveragingProgressTracker tracker;
private ReconnectForwarder successor;
/**
return Optional.empty();
}
- final TransmittedConnectionEntry entry = maybeEntry.get();
+ final TransmittedConnectionEntry entry = maybeEntry.orElseThrow();
tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
return false;
}
- inflight.addLast(maybeTransmitted.get());
+ inflight.addLast(maybeTransmitted.orElseThrow());
return true;
}
return pending.peek();
}
- final void poison(final RequestException cause) {
- poisonQueue(inflight, cause);
- poisonQueue(pending, cause);
+ final List<ConnectionEntry> poison() {
+ final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
+ entries.addAll(inflight);
+ inflight.clear();
+ entries.addAll(pending);
+ pending.clear();
+ return entries;
}
final void setForwarder(final ReconnectForwarder forwarder, final long now) {
- Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
- successor = Preconditions.checkNotNull(forwarder);
+ verify(successor == null, "Successor %s already set on connection %s", successor, this);
+ successor = requireNonNull(forwarder);
LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
/*
}
// Check if the entry has (ever) been transmitted
- if (!(e instanceof TransmittedConnectionEntry)) {
+ if (!(e instanceof TransmittedConnectionEntry te)) {
return Optional.empty();
}
- final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
-
// Now check session match
if (envelope.getSessionId() != te.getSessionId()) {
LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
return null;
}
-
- private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
- for (ConnectionEntry e : queue) {
- final Request<?, ?> request = e.getRequest();
- LOG.trace("Poisoning request {}", request, cause);
- e.complete(request.toRequestFailure(cause));
- }
- queue.clear();
- }
}