Barrier turn on/off - StackedOutboundQueue definition
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedOutboundQueue.java
index 1da6dd3dff01203d3eaea854924c05a18e902ec6..46039904543950e1a074a0381b7d7414210f26bd 100644 (file)
@@ -11,46 +11,23 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
 import io.netty.channel.Channel;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class StackedOutboundQueue implements OutboundQueue {
+final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
     private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
     private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
-    private static final AtomicLongFieldUpdater<StackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid");
-
-    @GuardedBy("unflushedSegments")
-    private volatile StackedSegment firstSegment;
-    @GuardedBy("unflushedSegments")
-    private final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
-    @GuardedBy("unflushedSegments")
-    private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
-    private final AbstractOutboundQueueManager<?> manager;
 
     private volatile long allocatedXid = -1;
     private volatile long barrierXid = -1;
-    private volatile long lastXid = -1;
 
-    @GuardedBy("unflushedSegments")
-    private Integer shutdownOffset;
-
-    // Accessed from Netty only
-    private int flushOffset;
-
-    StackedOutboundQueue(final AbstractOutboundQueueManager<?> manager) {
-        this.manager = Preconditions.checkNotNull(manager);
-        firstSegment = StackedSegment.create(0L);
-        uncompletedSegments.add(firstSegment);
-        unflushedSegments.add(firstSegment);
+    StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
+        super(manager);
     }
 
     @GuardedBy("unflushedSegments")
@@ -163,14 +140,7 @@ final class StackedOutboundQueue implements OutboundQueue {
         manager.ensureFlushing();
     }
 
-    /**
-     * Write some entries from the queue to the channel. Guaranteed to run
-     * in the corresponding EventLoop.
-     *
-     * @param channel Channel onto which we are writing
-     * @param now
-     * @return Number of entries written out
-     */
+    @Override
     int writeEntries(@Nonnull final Channel channel, final long now) {
         // Local cache
         StackedSegment segment = firstSegment;
@@ -247,6 +217,7 @@ final class StackedOutboundQueue implements OutboundQueue {
         return reserveEntry();
     }
 
+    @Override
     boolean pairRequest(final OfHeader message) {
         Iterator<StackedSegment> it = uncompletedSegments.iterator();
         while (it.hasNext()) {
@@ -292,66 +263,4 @@ final class StackedOutboundQueue implements OutboundQueue {
         LOG.debug("Failed to find completion for message {}", message);
         return false;
     }
-
-    long startShutdown(final Channel channel) {
-        /*
-         * We are dealing with a multi-threaded shutdown, as the user may still
-         * be reserving entries in the queue. We are executing in a netty thread,
-         * so neither flush nor barrier can be running, which is good news.
-         *
-         * We will eat up all the slots in the queue here and mark the offset first
-         * reserved offset and free up all the cached queues. We then schedule
-         * the flush task, which will deal with the rest of the shutdown process.
-         */
-        synchronized (unflushedSegments) {
-            // Increment the offset by the segment size, preventing fast path allocations,
-            // since we are holding the slow path lock, any reservations will see the queue
-            // in shutdown and fail accordingly.
-            final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
-            shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
-
-            return lockedShutdownFlush();
-        }
-    }
-
-    @GuardedBy("unflushedSegments")
-    private long lockedShutdownFlush() {
-        long entries = 0;
-
-        // Fail all queues
-        final Iterator<StackedSegment> it = uncompletedSegments.iterator();
-        while (it.hasNext()) {
-            final StackedSegment segment = it.next();
-
-            entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
-            if (segment.isComplete()) {
-                LOG.trace("Cleared segment {}", segment);
-                it.remove();
-            }
-        }
-
-        return entries;
-    }
-
-    boolean finishShutdown() {
-        synchronized (unflushedSegments) {
-            lockedShutdownFlush();
-        }
-
-        return !needsFlush();
-    }
-
-    boolean needsFlush() {
-        // flushOffset always points to the first entry, which can be changed only
-        // from Netty, so we are fine here.
-        if (firstSegment.getBaseXid() + flushOffset > lastXid) {
-            return false;
-        }
-
-        if (shutdownOffset != null && flushOffset >= shutdownOffset) {
-            return false;
-        }
-
-        return firstSegment.getEntry(flushOffset).isCommitted();
-    }
 }