X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FStackedOutboundQueue.java;h=46039904543950e1a074a0381b7d7414210f26bd;hb=refs%2Fchanges%2F98%2F27498%2F6;hp=1da6dd3dff01203d3eaea854924c05a18e902ec6;hpb=deca37b27340c6e9b1576c281982d5d7f4ee1795;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java index 1da6dd3d..46039904 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java @@ -11,46 +11,23 @@ import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.util.concurrent.FutureCallback; import io.netty.channel.Channel; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class StackedOutboundQueue implements OutboundQueue { +final class StackedOutboundQueue extends AbstractStackedOutboundQueue { private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class); private static final AtomicLongFieldUpdater BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid"); - private static final AtomicLongFieldUpdater LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid"); - - @GuardedBy("unflushedSegments") - private volatile StackedSegment firstSegment; - @GuardedBy("unflushedSegments") - private final List unflushedSegments = new ArrayList<>(2); - @GuardedBy("unflushedSegments") - private final List uncompletedSegments = new ArrayList<>(2); - private final AbstractOutboundQueueManager manager; private volatile long allocatedXid = -1; private volatile long barrierXid = -1; - private volatile long lastXid = -1; - @GuardedBy("unflushedSegments") - private Integer shutdownOffset; - - // Accessed from Netty only - private int flushOffset; - - StackedOutboundQueue(final AbstractOutboundQueueManager manager) { - this.manager = Preconditions.checkNotNull(manager); - firstSegment = StackedSegment.create(0L); - uncompletedSegments.add(firstSegment); - unflushedSegments.add(firstSegment); + StackedOutboundQueue(final AbstractOutboundQueueManager manager) { + super(manager); } @GuardedBy("unflushedSegments") @@ -163,14 +140,7 @@ final class StackedOutboundQueue implements OutboundQueue { manager.ensureFlushing(); } - /** - * Write some entries from the queue to the channel. Guaranteed to run - * in the corresponding EventLoop. - * - * @param channel Channel onto which we are writing - * @param now - * @return Number of entries written out - */ + @Override int writeEntries(@Nonnull final Channel channel, final long now) { // Local cache StackedSegment segment = firstSegment; @@ -247,6 +217,7 @@ final class StackedOutboundQueue implements OutboundQueue { return reserveEntry(); } + @Override boolean pairRequest(final OfHeader message) { Iterator it = uncompletedSegments.iterator(); while (it.hasNext()) { @@ -292,66 +263,4 @@ final class StackedOutboundQueue implements OutboundQueue { LOG.debug("Failed to find completion for message {}", message); return false; } - - long startShutdown(final Channel channel) { - /* - * We are dealing with a multi-threaded shutdown, as the user may still - * be reserving entries in the queue. We are executing in a netty thread, - * so neither flush nor barrier can be running, which is good news. - * - * We will eat up all the slots in the queue here and mark the offset first - * reserved offset and free up all the cached queues. We then schedule - * the flush task, which will deal with the rest of the shutdown process. - */ - synchronized (unflushedSegments) { - // Increment the offset by the segment size, preventing fast path allocations, - // since we are holding the slow path lock, any reservations will see the queue - // in shutdown and fail accordingly. - final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE); - shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE); - - return lockedShutdownFlush(); - } - } - - @GuardedBy("unflushedSegments") - private long lockedShutdownFlush() { - long entries = 0; - - // Fail all queues - final Iterator it = uncompletedSegments.iterator(); - while (it.hasNext()) { - final StackedSegment segment = it.next(); - - entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED); - if (segment.isComplete()) { - LOG.trace("Cleared segment {}", segment); - it.remove(); - } - } - - return entries; - } - - boolean finishShutdown() { - synchronized (unflushedSegments) { - lockedShutdownFlush(); - } - - return !needsFlush(); - } - - boolean needsFlush() { - // flushOffset always points to the first entry, which can be changed only - // from Netty, so we are fine here. - if (firstSegment.getBaseXid() + flushOffset > lastXid) { - return false; - } - - if (shutdownOffset != null && flushOffset >= shutdownOffset) { - return false; - } - - return firstSegment.getEntry(flushOffset).isCommitted(); - } }