X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmitQueue.java;h=24264eeedff77bca598e58d4d3f4f4a6ac996c34;hp=b7543410cd1a63128ac2da7ffde001cc9b3d778f;hb=dafc95d149bc62f101de37e94b9b5e3526d4e87b;hpb=b74c6012092e47430a8f4d6f4ddeb1d3e2b1b7df diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index b7543410cd..24264eeedf 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -54,10 +54,16 @@ import org.slf4j.LoggerFactory; @NotThreadSafe abstract class TransmitQueue { static final class Halted extends TransmitQueue { + // For ConnectingClientConnection. Halted(final int targetDepth) { super(targetDepth); } + // For ReconnectingClientConnection. + Halted(final TransmitQueue oldQueue, final long now) { + super(oldQueue, now); + } + @Override int canTransmitCount(final int inflightSize) { return 0; @@ -73,8 +79,9 @@ abstract class TransmitQueue { private final BackendInfo backend; private long nextTxSequence; - Transmitting(final int targetDepth, final BackendInfo backend) { - super(targetDepth); + // For ConnectedClientConnection. + Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) { + super(oldQueue, targetDepth, now); this.backend = Preconditions.checkNotNull(backend); } @@ -99,13 +106,37 @@ abstract class TransmitQueue { private final Deque inflight = new ArrayDeque<>(); private final Deque pending = new ArrayDeque<>(); - private final ProgressTracker tracker; + private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits. private ReconnectForwarder successor; + /** + * Construct initial transmitting queue. + */ TransmitQueue(final int targetDepth) { tracker = new AveragingProgressTracker(targetDepth); } + /** + * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance. + */ + TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) { + tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now); + } + + /** + * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance. + */ + TransmitQueue(final TransmitQueue oldQueue, final long now) { + tracker = new AveragingProgressTracker(oldQueue.tracker, now); + } + + /** + * Cancel the accumulated sum of delays as we expect the new backend to work now. + */ + void cancelDebt(final long now) { + tracker.cancelDebt(now); + } + /** * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries * to be added to it during replay. When we set the successor all entries enqueued between when this methods @@ -174,7 +205,7 @@ abstract class TransmitQueue { } private void transmitEntry(final ConnectionEntry entry, final long now) { - LOG.debug("Queue {} transmitting entry {}", entry); + LOG.debug("Queue {} transmitting entry {}", this, entry); // We are not thread-safe and are supposed to be externally-guarded, // hence send-before-record should be fine. // This needs to be revisited if the external guards are lowered. @@ -188,6 +219,7 @@ abstract class TransmitQueue { */ final long enqueue(final ConnectionEntry entry, final long now) { if (successor != null) { + // This call will pay the enqueuing price, hence the caller does not have to successor.forwardEntry(entry, now); return 0; } @@ -245,7 +277,7 @@ abstract class TransmitQueue { } final void setForwarder(final ReconnectForwarder forwarder, final long now) { - Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this); + Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this); successor = Preconditions.checkNotNull(forwarder); LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); @@ -257,14 +289,14 @@ abstract class TransmitQueue { int count = 0; ConnectionEntry entry = inflight.poll(); while (entry != null) { - successor.forwardEntry(entry, now); + successor.replayEntry(entry, now); entry = inflight.poll(); count++; } entry = pending.poll(); while (entry != null) { - successor.forwardEntry(entry, now); + successor.replayEntry(entry, now); entry = pending.poll(); count++; }