From: Vaclav Demcak Date: Wed, 7 Oct 2015 15:43:58 +0000 (+0200) Subject: Barrier turn on/off - move more functionality from StackedOutboundQueue X-Git-Tag: release/lithium-sr4~11^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=05ab0ada9e8c590102caf34b462c8d989f09d72f;hp=3545506a79947dec9b4ded9afe51388dadb16e27;p=openflowjava.git Barrier turn on/off - move more functionality from StackedOutboundQueue * move more functionality for reusing to AbstractStackedOutboundQueue * add general methods for Channel msg wrapper * fix NPE from OFEncoder Change-Id: I351300c4af40693ba444d3c10a1121e76d004d1b Signed-off-by: Vaclav Demcak --- diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java index 332f318d..77cb688d 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import io.netty.channel.Channel; import java.util.ArrayList; import java.util.Iterator; @@ -26,7 +27,7 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class); - protected static final AtomicLongFieldUpdater LAST_XID_UPDATER = AtomicLongFieldUpdater + protected static final AtomicLongFieldUpdater LAST_XID_OFFSET_UPDATER = AtomicLongFieldUpdater .newUpdater(AbstractStackedOutboundQueue.class, "lastXid"); @GuardedBy("unflushedSegments") @@ -37,6 +38,7 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { protected final List uncompletedSegments = new ArrayList<>(2); private volatile long lastXid = -1; + private volatile long allocatedXid = -1; @GuardedBy("unflushedSegments") protected Integer shutdownOffset; @@ -53,6 +55,61 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { unflushedSegments.add(firstSegment); } + @GuardedBy("unflushedSegments") + protected void ensureSegment(final StackedSegment first, final int offset) { + final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE; + LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size()); + + for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) { + final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i)); + LOG.debug("Adding segment {}", newSegment); + unflushedSegments.add(newSegment); + } + + allocatedXid = uncompletedSegments.get(uncompletedSegments.size() - 1).getEndXid(); + } + + /* + * This method is expected to be called from multiple threads concurrently. + */ + @Override + public Long reserveEntry() { + final long xid = LAST_XID_OFFSET_UPDATER.incrementAndGet(this); + final StackedSegment fastSegment = firstSegment; + + if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) { + if (xid >= allocatedXid) { + // Multiple segments, this a slow path + LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid); + + synchronized (unflushedSegments) { + LOG.debug("Queue {} executing slow reservation for XID {}", this, xid); + + // Shutdown was scheduled, need to fail the reservation + if (shutdownOffset != null) { + LOG.debug("Queue {} is being shutdown, failing reservation", this); + return null; + } + + // Ensure we have the appropriate segment for the specified XID + final StackedSegment slowSegment = firstSegment; + final int slowOffset = (int) (xid - slowSegment.getBaseXid()); + Verify.verify(slowOffset >= 0); + + // Now, we let's see if we need to allocate a new segment + ensureSegment(slowSegment, slowOffset); + + LOG.debug("Queue {} slow reservation finished", this); + } + } else { + LOG.debug("Queue {} XID {} is already backed", this, xid); + } + } + + LOG.trace("Queue {} allocated XID {}", this, xid); + return xid; + } + /** * Write some entries from the queue to the channel. Guaranteed to run * in the corresponding EventLoop. @@ -61,7 +118,71 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { * @param now * @return Number of entries written out */ - abstract int writeEntries(@Nonnull final Channel channel, final long now); + int writeEntries(@Nonnull final Channel channel, final long now) { + // Local cache + StackedSegment segment = firstSegment; + int entries = 0; + + while (channel.isWritable()) { + final OutboundQueueEntry entry = segment.getEntry(flushOffset); + if (!entry.isCommitted()) { + LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset); + break; + } + + LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset); + final OfHeader message = entry.takeMessage(); + flushOffset++; + entries++; + + if (message != null) { + manager.writeMessage(message, now); + } else { + entry.complete(null); + } + + if (flushOffset >= StackedSegment.SEGMENT_SIZE) { + /* + * Slow path: purge the current segment unless it's the last one. + * If it is, we leave it for replacement when a new reservation + * is run on it. + * + * This costs us two slow paths, but hey, this should be very rare, + * so let's keep things simple. + */ + synchronized (unflushedSegments) { + LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size()); + + // We may have raced ahead of reservation code and need to allocate a segment + ensureSegment(segment, flushOffset); + + // Remove the segment, update the firstSegment and reset flushOffset + final StackedSegment oldSegment = unflushedSegments.remove(0); + if (oldSegment.isComplete()) { + uncompletedSegments.remove(oldSegment); + oldSegment.recycle(); + } + + // Reset the first segment and add it to the uncompleted list + segment = unflushedSegments.get(0); + uncompletedSegments.add(segment); + + // Update the shutdown offset + if (shutdownOffset != null) { + shutdownOffset -= StackedSegment.SEGMENT_SIZE; + } + + // Allow reservations back on the fast path by publishing the new first segment + firstSegment = segment; + + flushOffset = 0; + LOG.debug("Queue {} flush moved to segment {}", this, segment); + } + } + } + + return entries; + } abstract boolean pairRequest(final OfHeader message); @@ -92,7 +213,7 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { // Increment the offset by the segment size, preventing fast path allocations, // since we are holding the slow path lock, any reservations will see the queue // in shutdown and fail accordingly. - final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE); + final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE); shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE); return lockedShutdownFlush(); diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java index 46039904..f19d9aa1 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java @@ -10,11 +10,8 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.util.concurrent.FutureCallback; -import io.netty.channel.Channel; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import javax.annotation.Nonnull; -import javax.annotation.concurrent.GuardedBy; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,68 +20,12 @@ final class StackedOutboundQueue extends AbstractStackedOutboundQueue { private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class); private static final AtomicLongFieldUpdater BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid"); - private volatile long allocatedXid = -1; private volatile long barrierXid = -1; StackedOutboundQueue(final AbstractOutboundQueueManager manager) { super(manager); } - @GuardedBy("unflushedSegments") - private void ensureSegment(final StackedSegment first, final int offset) { - final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE; - LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size()); - - for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) { - final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i)); - LOG.debug("Adding segment {}", newSegment); - unflushedSegments.add(newSegment); - } - - allocatedXid = uncompletedSegments.get(uncompletedSegments.size() - 1).getEndXid(); - } - - /* - * This method is expected to be called from multiple threads concurrently. - */ - @Override - public Long reserveEntry() { - final long xid = LAST_XID_UPDATER.incrementAndGet(this); - final StackedSegment fastSegment = firstSegment; - - if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) { - if (xid >= allocatedXid) { - // Multiple segments, this a slow path - LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid); - - synchronized (unflushedSegments) { - LOG.debug("Queue {} executing slow reservation for XID {}", this, xid); - - // Shutdown was scheduled, need to fail the reservation - if (shutdownOffset != null) { - LOG.debug("Queue {} is being shutdown, failing reservation", this); - return null; - } - - // Ensure we have the appropriate segment for the specified XID - final StackedSegment slowSegment = firstSegment; - final int slowOffset = (int) (xid - slowSegment.getBaseXid()); - Verify.verify(slowOffset >= 0); - - // Now, we let's see if we need to allocate a new segment - ensureSegment(slowSegment, slowOffset); - - LOG.debug("Queue {} slow reservation finished", this); - } - } else { - LOG.debug("Queue {} XID {} is already backed", this, xid); - } - } - - LOG.trace("Queue {} allocated XID {}", this, xid); - return xid; - } - /* * This method is expected to be called from multiple threads concurrently */ @@ -140,73 +81,6 @@ final class StackedOutboundQueue extends AbstractStackedOutboundQueue { manager.ensureFlushing(); } - @Override - int writeEntries(@Nonnull final Channel channel, final long now) { - // Local cache - StackedSegment segment = firstSegment; - int entries = 0; - - while (channel.isWritable()) { - final OutboundQueueEntry entry = segment.getEntry(flushOffset); - if (!entry.isCommitted()) { - LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset); - break; - } - - LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset); - final OfHeader message = entry.takeMessage(); - flushOffset++; - entries++; - - if (message != null) { - manager.writeMessage(message, now); - } else { - entry.complete(null); - } - - if (flushOffset >= StackedSegment.SEGMENT_SIZE) { - /* - * Slow path: purge the current segment unless it's the last one. - * If it is, we leave it for replacement when a new reservation - * is run on it. - * - * This costs us two slow paths, but hey, this should be very rare, - * so let's keep things simple. - */ - synchronized (unflushedSegments) { - LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size()); - - // We may have raced ahead of reservation code and need to allocate a segment - ensureSegment(segment, flushOffset); - - // Remove the segment, update the firstSegment and reset flushOffset - final StackedSegment oldSegment = unflushedSegments.remove(0); - if (oldSegment.isComplete()) { - uncompletedSegments.remove(oldSegment); - oldSegment.recycle(); - } - - // Reset the first segment and add it to the uncompleted list - segment = unflushedSegments.get(0); - uncompletedSegments.add(segment); - - // Update the shutdown offset - if (shutdownOffset != null) { - shutdownOffset -= StackedSegment.SEGMENT_SIZE; - } - - // Allow reservations back on the fast path by publishing the new first segment - firstSegment = segment; - - flushOffset = 0; - LOG.debug("Queue {} flush moved to segment {}", this, segment); - } - } - } - - return entries; - } - Long reserveBarrierIfNeeded() { final long bXid = barrierXid; final long fXid = firstSegment.getBaseXid() + flushOffset;