X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FAbstractStackedOutboundQueue.java;fp=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FAbstractStackedOutboundQueue.java;h=77cb688d462afa544e33c554d6822a69d1df99a3;hb=05ab0ada9e8c590102caf34b462c8d989f09d72f;hp=332f318de45fc677b39d00ac9245689361de5051;hpb=3545506a79947dec9b4ded9afe51388dadb16e27;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java index 332f318d..77cb688d 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import io.netty.channel.Channel; import java.util.ArrayList; import java.util.Iterator; @@ -26,7 +27,7 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class); - protected static final AtomicLongFieldUpdater LAST_XID_UPDATER = AtomicLongFieldUpdater + protected static final AtomicLongFieldUpdater LAST_XID_OFFSET_UPDATER = AtomicLongFieldUpdater .newUpdater(AbstractStackedOutboundQueue.class, "lastXid"); @GuardedBy("unflushedSegments") @@ -37,6 +38,7 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { protected final List uncompletedSegments = new ArrayList<>(2); private volatile long lastXid = -1; + private volatile long allocatedXid = -1; @GuardedBy("unflushedSegments") protected Integer shutdownOffset; @@ -53,6 +55,61 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { unflushedSegments.add(firstSegment); } + @GuardedBy("unflushedSegments") + protected void ensureSegment(final StackedSegment first, final int offset) { + final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE; + LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size()); + + for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) { + final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i)); + LOG.debug("Adding segment {}", newSegment); + unflushedSegments.add(newSegment); + } + + allocatedXid = uncompletedSegments.get(uncompletedSegments.size() - 1).getEndXid(); + } + + /* + * This method is expected to be called from multiple threads concurrently. + */ + @Override + public Long reserveEntry() { + final long xid = LAST_XID_OFFSET_UPDATER.incrementAndGet(this); + final StackedSegment fastSegment = firstSegment; + + if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) { + if (xid >= allocatedXid) { + // Multiple segments, this a slow path + LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid); + + synchronized (unflushedSegments) { + LOG.debug("Queue {} executing slow reservation for XID {}", this, xid); + + // Shutdown was scheduled, need to fail the reservation + if (shutdownOffset != null) { + LOG.debug("Queue {} is being shutdown, failing reservation", this); + return null; + } + + // Ensure we have the appropriate segment for the specified XID + final StackedSegment slowSegment = firstSegment; + final int slowOffset = (int) (xid - slowSegment.getBaseXid()); + Verify.verify(slowOffset >= 0); + + // Now, we let's see if we need to allocate a new segment + ensureSegment(slowSegment, slowOffset); + + LOG.debug("Queue {} slow reservation finished", this); + } + } else { + LOG.debug("Queue {} XID {} is already backed", this, xid); + } + } + + LOG.trace("Queue {} allocated XID {}", this, xid); + return xid; + } + /** * Write some entries from the queue to the channel. Guaranteed to run * in the corresponding EventLoop. @@ -61,7 +118,71 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { * @param now * @return Number of entries written out */ - abstract int writeEntries(@Nonnull final Channel channel, final long now); + int writeEntries(@Nonnull final Channel channel, final long now) { + // Local cache + StackedSegment segment = firstSegment; + int entries = 0; + + while (channel.isWritable()) { + final OutboundQueueEntry entry = segment.getEntry(flushOffset); + if (!entry.isCommitted()) { + LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset); + break; + } + + LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset); + final OfHeader message = entry.takeMessage(); + flushOffset++; + entries++; + + if (message != null) { + manager.writeMessage(message, now); + } else { + entry.complete(null); + } + + if (flushOffset >= StackedSegment.SEGMENT_SIZE) { + /* + * Slow path: purge the current segment unless it's the last one. + * If it is, we leave it for replacement when a new reservation + * is run on it. + * + * This costs us two slow paths, but hey, this should be very rare, + * so let's keep things simple. + */ + synchronized (unflushedSegments) { + LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size()); + + // We may have raced ahead of reservation code and need to allocate a segment + ensureSegment(segment, flushOffset); + + // Remove the segment, update the firstSegment and reset flushOffset + final StackedSegment oldSegment = unflushedSegments.remove(0); + if (oldSegment.isComplete()) { + uncompletedSegments.remove(oldSegment); + oldSegment.recycle(); + } + + // Reset the first segment and add it to the uncompleted list + segment = unflushedSegments.get(0); + uncompletedSegments.add(segment); + + // Update the shutdown offset + if (shutdownOffset != null) { + shutdownOffset -= StackedSegment.SEGMENT_SIZE; + } + + // Allow reservations back on the fast path by publishing the new first segment + firstSegment = segment; + + flushOffset = 0; + LOG.debug("Queue {} flush moved to segment {}", this, segment); + } + } + } + + return entries; + } abstract boolean pairRequest(final OfHeader message); @@ -92,7 +213,7 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { // Increment the offset by the segment size, preventing fast path allocations, // since we are holding the slow path lock, any reservations will see the queue // in shutdown and fail accordingly. - final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE); + final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE); shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE); return lockedShutdownFlush();