X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmitQueue.java;h=b7543410cd1a63128ac2da7ffde001cc9b3d778f;hb=bc2b83e97bc73930badd4a3063c65b849f82c664;hp=15ad958304f9b61cd1a20b322b12c7354542759e;hpb=503d824302de98ae7d9fd44c6c417ed651865919;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index 15ad958304..b7543410cd 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -11,9 +11,9 @@ import akka.actor.ActorRef; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Deque; import java.util.Iterator; import java.util.Optional; @@ -106,8 +106,20 @@ abstract class TransmitQueue { tracker = new AveragingProgressTracker(targetDepth); } - final Iterable asIterable() { - return Iterables.concat(inflight, pending); + /** + * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries + * to be added to it during replay. When we set the successor all entries enqueued between when this methods + * returns and the successor is set will be replayed to the successor. + * + * @return Collection of entries present in the queue. + */ + final Collection drain() { + final Collection ret = new ArrayDeque<>(inflight.size() + pending.size()); + ret.addAll(inflight); + ret.addAll(pending); + inflight.clear(); + pending.clear(); + return ret; } final long ticksStalling(final long now) { @@ -135,12 +147,16 @@ abstract class TransmitQueue { tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); // We have freed up a slot, try to transmit something + tryTransmit(now); + + return Optional.of(entry); + } + + final void tryTransmit(final long now) { final int toSend = canTransmitCount(inflight.size()); if (toSend > 0 && !pending.isEmpty()) { transmitEntries(toSend, now); } - - return Optional.of(entry); } private void transmitEntries(final int maxTransmit, final long now) { @@ -233,16 +249,36 @@ abstract class TransmitQueue { successor = Preconditions.checkNotNull(forwarder); LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); + /* + * We need to account for entries which have been added between the time drain() was called and this method + * is invoked. Since the old connection is visible during replay and some entries may have completed on the + * replay thread, there was an avenue for this to happen. + */ + int count = 0; ConnectionEntry entry = inflight.poll(); while (entry != null) { successor.forwardEntry(entry, now); entry = inflight.poll(); + count++; } entry = pending.poll(); while (entry != null) { successor.forwardEntry(entry, now); entry = pending.poll(); + count++; + } + + LOG.debug("Connection {} queue spliced {} messages", this, count); + } + + final void remove(final long now) { + final TransmittedConnectionEntry txe = inflight.poll(); + if (txe == null) { + final ConnectionEntry entry = pending.pop(); + tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0); + } else { + tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0); } } @@ -318,5 +354,4 @@ abstract class TransmitQueue { } queue.clear(); } - }