Implement a segmented OutboundQueue 29/21829/9
authorRobert Varga <rovarga@cisco.com>
Tue, 2 Jun 2015 19:19:57 +0000 (21:19 +0200)
committerRobert Varga <rovarga@cisco.com>
Tue, 9 Jun 2015 13:31:18 +0000 (15:31 +0200)
Internal lifecycle of a queue segment is disconnected. With this
implementation users do not observe queue changes until channel
shutdown, when the reported queue changes to null.

The reason for that is that the segments are managed internally without
the need to synchronize with the Netty thread. That synchronization is
performed only when there are multiple segments -- which is typical for
crossing the request count barrier.

Outbound queue manager is taght to not schedule a flush task it the
current state of the channel indicates that scheduling is not needed,
such as when the channel is not writable.

Change-Id: I037e27c00524e2a6ec218e3e7c1a5d6188b25ae8
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCache.java [deleted file]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCacheSlice.java [deleted file]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java [deleted file]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedSegment.java [new file with mode: 0644]

index ded62f57bd41844bc0e86bb99100dfc72084eb6a..cf2693c80276a4c587b3bc0f45d47af17d8078e5 100644 (file)
@@ -508,7 +508,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
 
-        final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, OutboundQueueCache.getInstance().getSlice(maxQueueDepth), maxBarrierNanos);
+        final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
         outputManager = ret;
         channel.pipeline().addLast(outputManager);
 
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCache.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCache.java
deleted file mode 100644 (file)
index f65c5a8..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core.connection;
-
-import java.util.HashMap;
-import java.util.Map;
-
-final class OutboundQueueCache {
-    private static final OutboundQueueCache INSTANCE = new OutboundQueueCache();
-
-    private final Map<Integer, OutboundQueueCacheSlice> slices = new HashMap<>();
-
-    private OutboundQueueCache() {
-
-    }
-
-    static OutboundQueueCache getInstance() {
-        return INSTANCE;
-    }
-
-    synchronized OutboundQueueCacheSlice getSlice(final int queueSize) {
-        final OutboundQueueCacheSlice oldSlice = slices.get(queueSize);
-        if (oldSlice != null) {
-            oldSlice.incRef();
-            return oldSlice;
-        }
-
-        final OutboundQueueCacheSlice newSlice = new OutboundQueueCacheSlice(queueSize);
-        slices.put(queueSize, newSlice);
-        return newSlice;
-    }
-
-    synchronized void putSlice(final OutboundQueueCacheSlice slice) {
-        if (slice.decRef()) {
-            slices.remove(slice.getQueueSize());
-        }
-    }
-}
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCacheSlice.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueCacheSlice.java
deleted file mode 100644 (file)
index a514589..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core.connection;
-
-import com.google.common.base.FinalizableReferenceQueue;
-import com.google.common.base.FinalizableSoftReference;
-import com.google.common.base.Preconditions;
-import java.lang.ref.Reference;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class OutboundQueueCacheSlice {
-    private static final class QueueRef extends FinalizableSoftReference<OutboundQueueImpl> {
-        private final Collection<?> cache;
-
-        protected QueueRef(final FinalizableReferenceQueue queue, final Collection<?> cache, final OutboundQueueImpl referent) {
-            super(referent, queue);
-            this.cache = Preconditions.checkNotNull(cache);
-        }
-
-        @Override
-        public void finalizeReferent() {
-            cache.remove(this);
-        }
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueCacheSlice.class);
-    private final FinalizableReferenceQueue refQueue = new FinalizableReferenceQueue();
-
-    private final ConcurrentLinkedDeque<QueueRef> cache = new ConcurrentLinkedDeque<>();
-    private final int queueSize;
-    private int refCount = 1;
-
-    OutboundQueueCacheSlice(final int queueSize) {
-        Preconditions.checkArgument(queueSize >= 1);
-        this.queueSize = queueSize;
-    }
-
-    void remove(final QueueRef queueRef) {
-        cache.remove(queueRef);
-    }
-
-    boolean decRef() {
-        if (--refCount == 0) {
-            refQueue.close();
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    void incRef() {
-        refCount++;
-    }
-
-    int getQueueSize() {
-        return queueSize;
-    }
-
-    OutboundQueueImpl getQueue(final OutboundQueueManager<?> manager, final long baseXid) {
-        final OutboundQueueImpl ret;
-        OutboundQueueImpl cached = null;
-        for (;;) {
-            final Reference<OutboundQueueImpl> item = cache.pollLast();
-            if (item == null) {
-                break;
-            }
-
-            cached = item.get();
-            if (cached != null) {
-                ret = cached.reuse(manager, baseXid);
-                LOG.trace("Reusing queue {} as {} on manager {}", cached, ret, manager);
-                return ret;
-            }
-        }
-
-        ret = new OutboundQueueImpl(manager, baseXid, queueSize + 1);
-        LOG.trace("Allocated new queue {} on manager {}", ret, manager);
-        return ret;
-    }
-
-    void putQueue(final OutboundQueueImpl queue) {
-        queue.retire();
-        if (cache.offer(new QueueRef(refQueue, cache, queue))) {
-            LOG.trace("Saving queue {} for later reuse", queue);
-        } else {
-            LOG.trace("Queue {} thrown away", queue);
-        }
-    }
-}
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java
deleted file mode 100644 (file)
index c4feb26..0000000
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core.connection;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import com.google.common.util.concurrent.FutureCallback;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import javax.annotation.Nonnull;
-import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class OutboundQueueImpl implements OutboundQueue {
-    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
-    private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
-    private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
-    private final OutboundQueueManager<?> manager;
-    private final OutboundQueueEntry[] queue;
-    private final long baseXid;
-    private final long endXid;
-    private final int reserve;
-
-    // Updated concurrently
-    private volatile int barrierOffset = -1;
-    private volatile int reserveOffset = 0;
-
-    // Updated from Netty only
-    private int flushOffset;
-    private int completeCount;
-    private int lastBarrierOffset = -1;
-
-    OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
-        /*
-         * We use the last entry as an emergency should a timeout-triggered
-         * flush request race with normal users for the last entry in this
-         * queue. In that case the flush request will take the last entry and
-         * schedule a flush, which means that we will get around sending the
-         * message as soon as the user finishes the reservation.
-         */
-        Preconditions.checkArgument(maxQueue > 1);
-        this.baseXid = baseXid;
-        this.endXid = baseXid + maxQueue;
-        this.reserve = maxQueue - 1;
-        this.manager = Preconditions.checkNotNull(manager);
-        queue = new OutboundQueueEntry[maxQueue];
-        for (int i = 0; i < maxQueue; ++i) {
-            queue[i] = new OutboundQueueEntry();
-        }
-    }
-
-    private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
-        this.manager = Preconditions.checkNotNull(manager);
-        this.queue = Preconditions.checkNotNull(queue);
-        this.baseXid = baseXid;
-        this.endXid = baseXid + queue.length;
-        this.reserve = queue.length - 1;
-    }
-
-    void retire() {
-        for (OutboundQueueEntry element : queue) {
-            element.reset();
-        }
-    }
-
-    OutboundQueueImpl reuse(final OutboundQueueManager<?> manager, final long baseXid) {
-        return new OutboundQueueImpl(manager, baseXid, queue);
-    }
-
-    @Override
-    public Long reserveEntry() {
-        return reserveEntry(false);
-    }
-
-    @Override
-    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
-        final int offset = (int)(xid - baseXid);
-        if (message != null) {
-            Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
-        }
-
-        final int ro = reserveOffset;
-        Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
-
-        final OutboundQueueEntry entry = queue[offset];
-        entry.commit(message, callback);
-        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
-
-        if (entry.isBarrier()) {
-            int my = offset;
-            for (;;) {
-                final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
-                if (prev < my) {
-                    LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
-                    break;
-                }
-
-                // We have traveled back, recover
-                LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
-                my = prev;
-            }
-        }
-
-        manager.ensureFlushing(this);
-    }
-
-    private Long reserveEntry(final boolean forBarrier) {
-        final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
-        if (offset >= reserve) {
-            if (forBarrier) {
-                LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
-                return endXid;
-            } else {
-                LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
-                return null;
-            }
-        }
-
-        final Long xid = baseXid + offset;
-        LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
-        return xid;
-    }
-
-    Long reserveBarrierIfNeeded() {
-        final int bo = barrierOffset;
-        if (bo >= flushOffset) {
-            LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
-            return null;
-        } else {
-            return reserveEntry(true);
-        }
-    }
-
-    int startShutdown() {
-        // Increment the offset by the queue size, hence preventing any normal
-        // allocations. We should not be seeing a barrier reservation after this
-        // and if there is one issued, we can disregard it.
-        final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
-
-        // If this offset is larger than reserve, trim it. That is not an accurate
-        // view of which slot was actually "reserved", but it indicates at which
-        // entry we can declare the queue flushed (e.g. at the emergency slot).
-        return offset > reserve ? reserve : offset;
-    }
-
-    boolean isShutdown(final int offset) {
-        // This queue is shutdown if the flushOffset (e.g. the next entry to
-        // be flushed) points to the offset 'reserved' in startShutdown()
-        return flushOffset >= offset;
-    }
-
-    /**
-     * An empty queue is a queue which has no further unflushed entries.
-     *
-     * @return True if this queue does not have unprocessed entries.
-     */
-    private boolean isEmpty() {
-        int ro = reserveOffset;
-        if (ro >= reserve) {
-            if (queue[reserve].isCommitted()) {
-                ro = reserve + 1;
-            } else {
-                ro = reserve;
-            }
-        }
-
-        LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
-        return ro <= flushOffset;
-    }
-
-    /**
-     * A queue is finished when all of its entries have been completed.
-     *
-     * @return False if there are any uncompleted requests.
-     */
-    boolean isFinished() {
-        if (completeCount < reserve) {
-            return false;
-        }
-
-        // We need to check if the last entry was used
-        final OutboundQueueEntry last = queue[reserve];
-        return !last.isCommitted() || last.isCompleted();
-    }
-
-    boolean isFlushed() {
-        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
-        if (flushOffset < reserve) {
-            return false;
-        }
-
-        // flushOffset implied == reserve
-        return flushOffset >= queue.length || !queue[reserve].isCommitted();
-    }
-
-    boolean needsFlush() {
-        if (flushOffset < reserve) {
-            return queue[flushOffset].isCommitted();
-        }
-
-        if (isFlushed()) {
-            LOG.trace("Queue {} is flushed, schedule a replace", this);
-            return true;
-        }
-        if (isFinished()) {
-            LOG.trace("Queue {} is finished, schedule a cleanup", this);
-            return true;
-        }
-
-        return false;
-    }
-
-    OfHeader flushEntry() {
-        for (;;) {
-            // No message ready
-            if (isEmpty()) {
-                LOG.trace("Flushed all reserved entries up to {}", flushOffset);
-                return null;
-            }
-
-            final OutboundQueueEntry entry = queue[flushOffset];
-            if (!entry.isCommitted()) {
-                LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
-                return null;
-            }
-
-            final OfHeader msg = entry.takeMessage();
-            flushOffset++;
-            if (msg != null) {
-                return msg;
-            }
-
-            LOG.trace("Null message, skipping to offset {}", flushOffset);
-        }
-    }
-
-    // Argument is 'long' to explicitly convert before performing operations
-    private boolean xidInRange(final long xid) {
-        return xid < endXid && (xid >= baseXid || baseXid > endXid);
-    }
-
-    private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
-        if (response instanceof Error) {
-            final Error err = (Error)response;
-            LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
-            entry.fail(new DeviceRequestFailedException("Device-side failure", err));
-            return true;
-        } else {
-            return entry.complete(response);
-        }
-    }
-
-    /**
-     * Return the request entry corresponding to a response. Returns null
-     * if there is no request matching the response.
-     *
-     * @param response Response message
-     * @return Matching request entry, or null if no match is found.
-     */
-    OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
-        final Long xid = response.getXid();
-        if (!xidInRange(xid)) {
-            LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
-            return null;
-        }
-
-        final int offset = (int)(xid - baseXid);
-        final OutboundQueueEntry entry = queue[offset];
-        if (entry.isCompleted()) {
-            LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
-            return null;
-        }
-
-        if (entry.isBarrier()) {
-            // This has been a barrier -- make sure we complete all preceding requests.
-            // XXX: Barriers are expected to complete in one message.
-            //      If this assumption is changed, this logic will need to be expanded
-            //      to ensure that the requests implied by the barrier are reported as
-            //      completed *after* the barrier.
-            LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
-            completeRequests(offset);
-            lastBarrierOffset = offset;
-
-            final boolean success = completeEntry(entry, response);
-            Verify.verify(success, "Barrier request failed to complete");
-            completeCount++;
-        } else if (completeEntry(entry, response)) {
-            completeCount++;
-        }
-
-        return entry;
-    }
-
-    private void completeRequests(final int toOffset) {
-        for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
-            final OutboundQueueEntry entry = queue[i];
-            if (!entry.isCompleted() && entry.complete(null)) {
-                completeCount++;
-            }
-        }
-    }
-
-    void completeAll() {
-        completeRequests(queue.length);
-    }
-
-    int failAll(final OutboundQueueException cause) {
-        int ret = 0;
-        for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
-            final OutboundQueueEntry entry = queue[i];
-            if (!entry.isCommitted()) {
-                break;
-            }
-
-            if (!entry.isCompleted()) {
-                entry.fail(cause);
-                ret++;
-            }
-        }
-
-        return ret;
-    }
-}
index 66fa8a60ca799fe9772bfd5db0e65bceb0f43382..dc378164dc1685a4442424fed0d49409209d8f7c 100644 (file)
@@ -11,12 +11,8 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
@@ -29,20 +25,6 @@ import org.slf4j.LoggerFactory;
 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
 
-    /**
-     * This is the default upper bound we place on the flush task running
-     * a single iteration. We relinquish control after about this amount
-     * of time.
-     */
-    private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
-
-    /**
-     * We re-check the time spent flushing every this many messages. We do this because
-     * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
-     * or similar to disable the feature.
-     */
-    private static final int WORKTIME_RECHECK_MSGS = 64;
-
     /**
      * Default low write watermark. Channel will become writable when number of outstanding
      * bytes dips below this value.
@@ -55,23 +37,23 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      */
     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
 
-
-    private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
     private final AtomicBoolean flushScheduled = new AtomicBoolean();
+    private final StackedOutboundQueue currentQueue;
     private final ConnectionAdapterImpl parent;
     private final InetSocketAddress address;
+    private final int maxNonBarrierMessages;
     private final long maxBarrierNanos;
-    private final long maxWorkTime;
     private final T handler;
 
+    // Accessed concurrently
+    private volatile boolean reading;
+
     // Updated from netty only
-    private long lastBarrierNanos = System.nanoTime();
-    private OutboundQueueCacheSlice slice;
-    private OutboundQueueImpl currentQueue;
+    private boolean alreadyReading;
     private boolean barrierTimerEnabled;
+    private long lastBarrierNanos = System.nanoTime();
     private int nonBarrierMessages;
-    private long lastXid = 0;
-    private Integer shutdownOffset;
+    private boolean shuttingDown;
 
     // Passed to executor to request triggering of flush
     private final Runnable flushRunnable = new Runnable() {
@@ -90,17 +72,18 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     };
 
     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
-        final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
+        final int maxNonBarrierMessages, final long maxBarrierNanos) {
         this.parent = Preconditions.checkNotNull(parent);
         this.handler = Preconditions.checkNotNull(handler);
-        this.slice = Preconditions.checkNotNull(slice);
+        Preconditions.checkArgument(maxNonBarrierMessages > 0);
+        this.maxNonBarrierMessages = maxNonBarrierMessages;
         Preconditions.checkArgument(maxBarrierNanos > 0);
         this.maxBarrierNanos = maxBarrierNanos;
         this.address = address;
-        this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
 
-        LOG.debug("Queue manager instantiated with queue slice {}", slice);
-        createQueue();
+        currentQueue = new StackedOutboundQueue(this);
+        LOG.debug("Queue manager instantiated with queue {}", currentQueue);
+        handler.onConnectionQueueChanged(currentQueue);
     }
 
     T getHandler() {
@@ -110,20 +93,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     @Override
     public void close() {
         handler.onConnectionQueueChanged(null);
-        if (slice != null) {
-            slice.decRef();
-            slice = null;
-        }
-    }
-
-    private void createQueue() {
-        final long baseXid = lastXid;
-        lastXid += slice.getQueueSize() + 1;
-
-        final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
-        activeQueues.add(queue);
-        currentQueue = queue;
-        handler.onConnectionQueueChanged(queue);
     }
 
     private void scheduleBarrierTimer(final long now) {
@@ -150,45 +119,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         LOG.trace("Barrier XID {} scheduled", xid);
     }
 
-    /**
-     * Flush an entry from the queue.
-     *
-     * @param now Time reference for 'now'. We take this as an argument, as
-     *            we need a timestamp to mark barrier messages we see swinging
-     *            by. That timestamp does not need to be completely accurate,
-     *            hence we use the flush start time. Alternative would be to
-     *            measure System.nanoTime() for each barrier -- needlessly
-     *            adding overhead.
-     *
-     * @return Entry which was flushed, null if no entry is ready.
-     */
-    OfHeader flushEntry(final long now) {
-        final OfHeader message = currentQueue.flushEntry();
-        if (currentQueue.isFlushed()) {
-            LOG.debug("Queue {} is fully flushed", currentQueue);
-            createQueue();
-        }
-
-        if (message == null) {
-            return null;
-        }
-
-        if (message instanceof BarrierInput) {
-            LOG.trace("Barrier message seen, resetting counters");
-            nonBarrierMessages = 0;
-            lastBarrierNanos = now;
-        } else {
-            nonBarrierMessages++;
-            if (nonBarrierMessages >= slice.getQueueSize()) {
-                LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
-                scheduleBarrierMessage();
-            } else if (!barrierTimerEnabled) {
-                scheduleBarrierTimer(now);
-            }
-        }
-
-        return message;
-    }
 
     /**
      * Invoked whenever a message comes in from the switch. Runs matching
@@ -200,50 +130,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     boolean onMessage(final OfHeader message) {
         LOG.trace("Attempting to pair message {} to a request", message);
 
-        Iterator<OutboundQueueImpl> it = activeQueues.iterator();
-        while (it.hasNext()) {
-            final OutboundQueueImpl queue = it.next();
-            final OutboundQueueEntry entry = queue.pairRequest(message);
-
-            if (entry == null) {
-                continue;
-            }
-
-            LOG.trace("Queue {} accepted response {}", queue, message);
-
-            // This has been a barrier request, we need to flush all
-            // previous queues
-            if (entry.isBarrier() && activeQueues.size() > 1) {
-                LOG.trace("Queue {} indicated request was a barrier", queue);
-
-                it = activeQueues.iterator();
-                while (it.hasNext()) {
-                    final OutboundQueueImpl q = it.next();
-
-                    // We want to complete all queues before the current one, we will
-                    // complete the current queue below
-                    if (!queue.equals(q)) {
-                        LOG.trace("Queue {} is implied finished", q);
-                        q.completeAll();
-                        it.remove();
-                        slice.putQueue(q);
-                    } else {
-                        break;
-                    }
-                }
-            }
-
-            if (queue.isFinished()) {
-                LOG.trace("Queue {} is finished", queue);
-                it.remove();
-                slice.putQueue(queue);
-            }
-
-            return true;
-        }
-
-        LOG.debug("Failed to find completion for message {}", message);
-        return false;
+        return currentQueue.pairRequest(message);
     }
 
     private void scheduleFlush() {
@@ -255,18 +142,13 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         }
     }
 
-    void ensureFlushing(final OutboundQueueImpl queue) {
-        Preconditions.checkState(currentQueue.equals(queue));
-        scheduleFlush();
-    }
-
     /**
      * Periodic barrier check.
      */
     protected void barrier() {
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
         barrierTimerEnabled = false;
-        if (shutdownOffset != null) {
+        if (shuttingDown) {
             LOG.trace("Channel shut down, not processing barrier");
             return;
         }
@@ -301,31 +183,19 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         conditionalFlush();
     }
 
-    private void shutdownFlush() {
-        long entries = 0;
-
-        // Fail all queues
-        final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
-        while (it.hasNext()) {
-            final OutboundQueueImpl queue = it.next();
+    private void writeAndFlush() {
+        final long start = System.nanoTime();
 
-            entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
-            if (queue.isFinished()) {
-                LOG.trace("Cleared queue {}", queue);
-                it.remove();
-            }
+        final int entries = currentQueue.writeEntries(parent.getChannel(), start);
+        if (entries > 0) {
+            LOG.debug("Flushing channel {}", parent.getChannel());
+            parent.getChannel().flush();
         }
 
-        LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
-
-        Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
-        if (currentQueue.isShutdown(shutdownOffset)) {
-            currentQueue = null;
-            handler.onConnectionQueueChanged(null);
-            LOG.debug("Channel {} shutdown complete", parent.getChannel());
-        } else {
-            LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
-            rescheduleFlush();
+        if (LOG.isDebugEnabled()) {
+            final long stop = System.nanoTime();
+            LOG.debug("Flushed {} messages to channel {} in {}us", entries,
+                parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
         }
     }
 
@@ -336,61 +206,17 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      */
     protected void flush() {
         // If the channel is gone, just flush whatever is not completed
-        if (shutdownOffset != null) {
-            shutdownFlush();
-            return;
-        }
-
-        final long start = System.nanoTime();
-        final long deadline = start + maxWorkTime;
-
-        LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
-
-        long messages = 0;
-        for (;; ++messages) {
-            if (!parent.getChannel().isWritable()) {
-                LOG.debug("Channel {} is no longer writable", parent.getChannel());
-                break;
-            }
-
-            final OfHeader message = flushEntry(start);
-            if (message == null) {
-                LOG.trace("The queue is completely drained");
-                break;
-            }
-
-            final Object wrapper;
-            if (address == null) {
-                wrapper = new MessageListenerWrapper(message, null);
-            } else {
-                wrapper = new UdpMessageListenerWrapper(message, null, address);
-            }
-            parent.getChannel().write(wrapper);
-
-            /*
-             * Check every WORKTIME_RECHECK_MSGS for exceeded time.
-             *
-             * XXX: given we already measure our flushing throughput, we
-             *      should be able to perform dynamic adjustments here.
-             *      is that additional complexity needed, though?
-             */
-            if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
-                LOG.trace("Exceeded allotted work time {}us",
-                        TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
-                break;
-            }
-        }
-
-        if (messages > 0) {
-            LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
-            parent.getChannel().flush();
+        if (!shuttingDown) {
+            LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
+            writeAndFlush();
+            rescheduleFlush();
+        } else if (currentQueue.finishShutdown()) {
+            handler.onConnectionQueueChanged(null);
+            LOG.debug("Channel {} shutdown complete", parent.getChannel());
+        } else {
+            LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+            rescheduleFlush();
         }
-
-        final long stop = System.nanoTime();
-        LOG.debug("Flushed {} messages in {}us to channel {}",
-                messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
-
-        rescheduleFlush();
     }
 
     /**
@@ -399,7 +225,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      */
     private void conditionalFlush() {
         if (currentQueue.needsFlush()) {
-            if (shutdownOffset != null || parent.getChannel().isWritable()) {
+            if (shuttingDown || parent.getChannel().isWritable()) {
                 scheduleFlush();
             } else {
                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
@@ -409,15 +235,10 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         }
     }
 
-    private void conditionalFlush(final ChannelHandlerContext ctx) {
-        Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
-        conditionalFlush();
-    }
-
     @Override
     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-        conditionalFlush(ctx);
+        conditionalFlush();
     }
 
     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
@@ -437,6 +258,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
         super.channelWritabilityChanged(ctx);
 
+        // A simple trade-off. While we could write things right away, if there is a task
+        // schedule, let it have the work
         if (flushScheduled.compareAndSet(false, true)) {
             LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
             flush();
@@ -449,34 +272,113 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
+        LOG.debug("Channel {} initiating shutdown...", ctx.channel());
 
-        /*
-         * We are dealing with a multi-threaded shutdown, as the user may still
-         * be reserving entries in the queue. We are executing in a netty thread,
-         * so neither flush nor barrier can be running, which is good news.
-         *
-         * We will eat up all the slots in the queue here and mark the offset first
-         * reserved offset and free up all the cached queues. We then schedule
-         * the flush task, which will deal with the rest of the shutdown process.
-         */
-        shutdownOffset = currentQueue.startShutdown();
-        if (slice != null) {
-            slice.decRef();
-            slice = null;
-        }
+        shuttingDown = true;
+        final long entries = currentQueue.startShutdown(ctx.channel());
+        LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
 
-        LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
         scheduleFlush();
     }
 
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+        // non-volatile read if we are called multiple times
+        if (!alreadyReading) {
+            alreadyReading = true;
+            reading = true;
+        }
+        super.channelRead(ctx, msg);
+    }
+
+    @Override
+    public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
+        super.channelReadComplete(ctx);
+        alreadyReading = false;
+        reading = false;
+
+        // TODO: model this as an atomic gate. We need to sync on it to make sure
+        //       that ensureFlushing() suppresses scheudling only if this barrier
+        //       has not been crossed.
+        synchronized (this) {
+            // Run flush regardless of writability. This is not strictly required, as
+            // there may be a scheduled flush. Instead of canceling it, which is expensive,
+            // we'll steal its work. Note that more work may accumulate in the time window
+            // between now and when the task will run, so it may not be a no-op after all.
+            //
+            // The reason for this is to will the output buffer before we go into selection
+            // phase. This will make sure the pipe is full (in which case our next wake up
+            // will be the queue becoming writable).
+            writeAndFlush();
+        }
+
+        LOG.debug("Opportunistic write on channel {}", parent.getChannel());
+        writeAndFlush();
+    }
+
     @Override
     public String toString() {
         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
     }
 
+    void ensureFlushing() {
+        // If the channel is not writable, there's no point in waking up,
+        // once we become writable, we will run a full flush
+        if (!parent.getChannel().isWritable()) {
+            return;
+        }
+
+        // We are currently reading something, just a quick sync to ensure we will in fact
+        // flush state.
+        if (reading) {
+            synchronized (this) {
+                if (reading) {
+                    return;
+                }
+            }
+        }
+
+        // Netty thread is outside our code, we need to schedule a flush
+        // to re-synchronize.
+        scheduleFlush();
+    }
+
     void onEchoRequest(final EchoRequestMessage message) {
         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
         parent.getChannel().writeAndFlush(reply);
     }
+
+    /**
+     * Write a message into the underlying channel.
+     *
+     * @param now Time reference for 'now'. We take this as an argument, as
+     *            we need a timestamp to mark barrier messages we see swinging
+     *            by. That timestamp does not need to be completely accurate,
+     *            hence we use the flush start time. Alternative would be to
+     *            measure System.nanoTime() for each barrier -- needlessly
+     *            adding overhead.
+     */
+    void writeMessage(final OfHeader message, final long now) {
+        final Object wrapper;
+        if (address == null) {
+            wrapper = new MessageListenerWrapper(message, null);
+        } else {
+            wrapper = new UdpMessageListenerWrapper(message, null, address);
+        }
+        parent.getChannel().write(wrapper);
+
+        if (message instanceof BarrierInput) {
+            LOG.trace("Barrier message seen, resetting counters");
+            nonBarrierMessages = 0;
+            lastBarrierNanos = now;
+        } else {
+            nonBarrierMessages++;
+            if (nonBarrierMessages >= maxNonBarrierMessages) {
+                LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+                scheduleBarrierMessage();
+            } else if (!barrierTimerEnabled) {
+                scheduleBarrierTimer(now);
+            }
+        }
+    }
 }
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java
new file mode 100644 (file)
index 0000000..3c3ceee
--- /dev/null
@@ -0,0 +1,351 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.FutureCallback;
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StackedOutboundQueue implements OutboundQueue {
+    private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
+    private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
+    private static final AtomicLongFieldUpdater<StackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid");
+
+    @GuardedBy("unflushedSegments")
+    private volatile StackedSegment firstSegment;
+    @GuardedBy("unflushedSegments")
+    private final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
+    @GuardedBy("unflushedSegments")
+    private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
+    private final OutboundQueueManager<?> manager;
+
+    private volatile long lastXid = -1;
+    private volatile long barrierXid = -1;
+
+    @GuardedBy("unflushedSegments")
+    private Integer shutdownOffset;
+
+    // Accessed from Netty only
+    private int flushOffset;
+
+    StackedOutboundQueue(final OutboundQueueManager<?> manager) {
+        this.manager = Preconditions.checkNotNull(manager);
+        firstSegment = StackedSegment.create(0L);
+        uncompletedSegments.add(firstSegment);
+        unflushedSegments.add(firstSegment);
+    }
+
+    @GuardedBy("unflushedSegments")
+    private void ensureSegment(final StackedSegment first, final int offset) {
+        final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
+        LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size());
+
+        for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) {
+            final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i));
+            LOG.debug("Adding segment {}", newSegment);
+            unflushedSegments.add(newSegment);
+        }
+    }
+
+    /*
+     * This method is expected to be called from multiple threads concurrently.
+     */
+    @Override
+    public Long reserveEntry() {
+        final long xid = LAST_XID_UPDATER.incrementAndGet(this);
+        final StackedSegment fastSegment = firstSegment;
+
+        if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) {
+            LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
+
+            // Multiple segments, this a slow path
+            synchronized (unflushedSegments) {
+                LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
+
+                // Shutdown was scheduled, need to fail the reservation
+                if (shutdownOffset != null) {
+                    LOG.debug("Queue {} is being shutdown, failing reservation", this);
+                    return null;
+                }
+
+                // Ensure we have the appropriate segment for the specified XID
+                final StackedSegment slowSegment = firstSegment;
+                final int slowOffset = (int) (xid - slowSegment.getBaseXid());
+                Verify.verify(slowOffset >= 0);
+
+                // Now, we let's see if we need to allocate a new segment
+                ensureSegment(slowSegment, slowOffset);
+
+                LOG.debug("Queue {} slow reservation finished", this);
+            }
+        }
+
+        LOG.trace("Queue {} allocated XID {}", this, xid);
+        return xid;
+    }
+
+    /*
+     * This method is expected to be called from multiple threads concurrently
+     */
+    @Override
+    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+        final StackedSegment fastSegment = firstSegment;
+        final long calcOffset = xid - fastSegment.getBaseXid();
+        Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
+
+        Verify.verify(calcOffset <= Integer.MAX_VALUE);
+        final int fastOffset = (int) calcOffset;
+
+        final OutboundQueueEntry entry;
+        if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
+            LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
+
+            final StackedSegment segment;
+            final int slowOffset;
+            synchronized (unflushedSegments) {
+                final StackedSegment slowSegment = firstSegment;
+                final long slowCalcOffset = xid - slowSegment.getBaseXid();
+                Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
+                slowOffset = (int) slowCalcOffset;
+
+                LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
+                segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
+            }
+
+            final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
+            entry = segment.getEntry(segOffset);
+            LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
+        } else {
+            entry = fastSegment.getEntry(fastOffset);
+        }
+
+        entry.commit(message, callback);
+        if (entry.isBarrier()) {
+            long my = xid;
+            for (;;) {
+                final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
+                if (prev < my) {
+                    LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
+                    break;
+                }
+
+                // We have traveled back, recover
+                LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
+                my = prev;
+            }
+        }
+
+        LOG.trace("Queue {} committed XID {}", this, xid);
+        manager.ensureFlushing();
+    }
+
+    /**
+     * Write some entries from the queue to the channel. Guaranteed to run
+     * in the corresponding EventLoop.
+     *
+     * @param channel Channel onto which we are writing
+     * @param now
+     * @return Number of entries written out
+     */
+    int writeEntries(@Nonnull final Channel channel, final long now) {
+        // Local cache
+        StackedSegment segment = firstSegment;
+        int entries = 0;
+
+        while (channel.isWritable()) {
+            final OutboundQueueEntry entry = segment.getEntry(flushOffset);
+            if (!entry.isCommitted()) {
+                LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset);
+                break;
+            }
+
+            LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset);
+            final OfHeader message = entry.takeMessage();
+            flushOffset++;
+            entries++;
+
+            if (message != null) {
+                manager.writeMessage(message, now);
+            } else {
+                entry.complete(null);
+            }
+
+            if (flushOffset >= StackedSegment.SEGMENT_SIZE) {
+                /*
+                 * Slow path: purge the current segment unless it's the last one.
+                 * If it is, we leave it for replacement when a new reservation
+                 * is run on it.
+                 *
+                 * This costs us two slow paths, but hey, this should be very rare,
+                 * so let's keep things simple.
+                 */
+                synchronized (unflushedSegments) {
+                    LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size());
+
+                    // We may have raced ahead of reservation code and need to allocate a segment
+                    ensureSegment(segment, flushOffset);
+
+                    // Remove the segment, update the firstSegment and reset flushOffset
+                    final StackedSegment oldSegment = unflushedSegments.remove(0);
+                    if (oldSegment.isComplete()) {
+                        uncompletedSegments.remove(oldSegment);
+                        oldSegment.recycle();
+                    }
+
+                    // Reset the first segment and add it to the uncompleted list
+                    segment = unflushedSegments.get(0);
+                    uncompletedSegments.add(segment);
+
+                    // Update the shutdown offset
+                    if (shutdownOffset != null) {
+                        shutdownOffset -= StackedSegment.SEGMENT_SIZE;
+                    }
+
+                    // Allow reservations back on the fast path by publishing the new first segment
+                    firstSegment = segment;
+
+                    flushOffset = 0;
+                    LOG.debug("Queue {} flush moved to segment {}", this, segment);
+                }
+            }
+        }
+
+        return entries;
+    }
+
+    Long reserveBarrierIfNeeded() {
+        final long bXid = barrierXid;
+        final long fXid = firstSegment.getBaseXid() + flushOffset;
+        if (bXid >= fXid) {
+            LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
+            return null;
+        } else {
+            return reserveEntry();
+        }
+    }
+
+    boolean pairRequest(final OfHeader message) {
+        Iterator<StackedSegment> it = uncompletedSegments.iterator();
+        while (it.hasNext()) {
+            final StackedSegment queue = it.next();
+            final OutboundQueueEntry entry = queue.pairRequest(message);
+            if (entry == null) {
+                continue;
+            }
+
+            LOG.trace("Queue {} accepted response {}", queue, message);
+
+            // This has been a barrier request, we need to flush all
+            // previous queues
+            if (entry.isBarrier() && uncompletedSegments.size() > 1) {
+                LOG.trace("Queue {} indicated request was a barrier", queue);
+
+                it = uncompletedSegments.iterator();
+                while (it.hasNext()) {
+                    final StackedSegment q = it.next();
+
+                    // We want to complete all queues before the current one, we will
+                    // complete the current queue below
+                    if (!queue.equals(q)) {
+                        LOG.trace("Queue {} is implied finished", q);
+                        q.completeAll();
+                        it.remove();
+                        q.recycle();
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            if (queue.isComplete()) {
+                LOG.trace("Queue {} is finished", queue);
+                it.remove();
+                queue.recycle();
+            }
+
+            return true;
+        }
+
+        LOG.debug("Failed to find completion for message {}", message);
+        return false;
+    }
+
+    long startShutdown(final Channel channel) {
+        /*
+         * We are dealing with a multi-threaded shutdown, as the user may still
+         * be reserving entries in the queue. We are executing in a netty thread,
+         * so neither flush nor barrier can be running, which is good news.
+         *
+         * We will eat up all the slots in the queue here and mark the offset first
+         * reserved offset and free up all the cached queues. We then schedule
+         * the flush task, which will deal with the rest of the shutdown process.
+         */
+        synchronized (unflushedSegments) {
+            // Increment the offset by the segment size, preventing fast path allocations,
+            // since we are holding the slow path lock, any reservations will see the queue
+            // in shutdown and fail accordingly.
+            final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
+            shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
+
+            return lockedShutdownFlush();
+        }
+    }
+
+    @GuardedBy("unflushedSegments")
+    private long lockedShutdownFlush() {
+        long entries = 0;
+
+        // Fail all queues
+        final Iterator<StackedSegment> it = uncompletedSegments.iterator();
+        while (it.hasNext()) {
+            final StackedSegment segment = it.next();
+
+            entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+            if (segment.isComplete()) {
+                LOG.trace("Cleared segment {}", segment);
+                it.remove();
+            }
+        }
+
+        return entries;
+    }
+
+    boolean finishShutdown() {
+        synchronized (unflushedSegments) {
+            lockedShutdownFlush();
+        }
+
+        return !needsFlush();
+    }
+
+    boolean needsFlush() {
+        // flushOffset always points to the first entry, which can be changed only
+        // from Netty, so we are fine here.
+        if (firstSegment.getBaseXid() + flushOffset > lastXid) {
+            return false;
+        }
+
+        if (shutdownOffset != null && flushOffset >= shutdownOffset) {
+            return false;
+        }
+
+        return firstSegment.getEntry(flushOffset).isCommitted();
+    }
+}
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedSegment.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedSegment.java
new file mode 100644 (file)
index 0000000..23bdb0b
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.FinalizableReferenceQueue;
+import com.google.common.base.FinalizableSoftReference;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import java.lang.ref.Reference;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StackedSegment {
+    private static final class QueueRef extends FinalizableSoftReference<OutboundQueueEntry[]> {
+        QueueRef(final FinalizableReferenceQueue queue, final OutboundQueueEntry[] referent) {
+            super(referent, queue);
+        }
+
+        @Override
+        public void finalizeReferent() {
+            CACHE.remove(this);
+        }
+    }
+
+    /**
+     * Size of each individual segment
+     */
+    static final int SEGMENT_SIZE = 4096;
+
+    private static final Logger LOG = LoggerFactory.getLogger(StackedSegment.class);
+    private static final FinalizableReferenceQueue REF_QUEUE = new FinalizableReferenceQueue();
+    private static final ConcurrentLinkedDeque<QueueRef> CACHE = new ConcurrentLinkedDeque<>();
+
+    private final OutboundQueueEntry[] entries;
+    private final long baseXid;
+    private final long endXid;
+
+    // Updated from netty only
+    private int lastBarrierOffset = -1;
+    private int completeCount;
+
+    StackedSegment(final long baseXid, final OutboundQueueEntry[] entries) {
+        this.baseXid = baseXid;
+        this.endXid = baseXid + SEGMENT_SIZE;
+        this.entries = Preconditions.checkNotNull(entries);
+    }
+
+    static StackedSegment create(final long baseXid) {
+        final StackedSegment ret;
+        for (;;) {
+            final Reference<OutboundQueueEntry[]> item = CACHE.pollLast();
+            if (item == null) {
+                break;
+            }
+
+            final OutboundQueueEntry[] cached = item.get();
+            if (cached != null) {
+                ret = new StackedSegment(baseXid, cached);
+                LOG.trace("Reusing array {} in segment {}", cached, ret);
+                return ret;
+            }
+        }
+
+        final OutboundQueueEntry[] entries = new OutboundQueueEntry[SEGMENT_SIZE];
+        for (int i = 0; i < SEGMENT_SIZE; ++i) {
+            entries[i] = new OutboundQueueEntry();
+        }
+
+        ret = new StackedSegment(baseXid, entries);
+        LOG.trace("Allocated new segment {}", ret);
+        return ret;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid).add("completeCount", completeCount).toString();
+    }
+
+    long getBaseXid() {
+        return baseXid;
+    }
+
+    OutboundQueueEntry getEntry(final int offset) {
+        return entries[offset];
+    }
+
+    private boolean xidInRange(final long xid) {
+        return xid < endXid && (xid >= baseXid || baseXid > endXid);
+    }
+
+    private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
+        if (response instanceof Error) {
+            final Error err = (Error)response;
+            LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
+            entry.fail(new DeviceRequestFailedException("Device-side failure", err));
+            return true;
+        } else {
+            return entry.complete(response);
+        }
+    }
+
+    OutboundQueueEntry pairRequest(final OfHeader response) {
+        // Explicitly 'long' to force unboxing before performing operations
+        final long xid = response.getXid();
+        if (!xidInRange(xid)) {
+            LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
+            return null;
+        }
+
+        final int offset = (int)(xid - baseXid);
+        final OutboundQueueEntry entry = entries[offset];
+        if (entry.isCompleted()) {
+            LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
+            return null;
+        }
+
+        if (entry.isBarrier()) {
+            // This has been a barrier -- make sure we complete all preceding requests.
+            // XXX: Barriers are expected to complete in one message.
+            //      If this assumption is changed, this logic will need to be expanded
+            //      to ensure that the requests implied by the barrier are reported as
+            //      completed *after* the barrier.
+            LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
+            completeRequests(offset);
+            lastBarrierOffset = offset;
+
+            final boolean success = completeEntry(entry, response);
+            Verify.verify(success, "Barrier request failed to complete");
+            completeCount++;
+        } else if (completeEntry(entry, response)) {
+            completeCount++;
+        }
+
+        return entry;
+    }
+
+    private void completeRequests(final int toOffset) {
+        for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
+            final OutboundQueueEntry entry = entries[i];
+            if (!entry.isCompleted() && entry.complete(null)) {
+                completeCount++;
+            }
+        }
+    }
+
+    void completeAll() {
+        completeRequests(entries.length);
+    }
+
+    int failAll(final OutboundQueueException cause) {
+        int ret = 0;
+        for (int i = lastBarrierOffset + 1; i < entries.length; ++i) {
+            final OutboundQueueEntry entry = entries[i];
+            if (!entry.isCommitted()) {
+                break;
+            }
+
+            if (!entry.isCompleted()) {
+                entry.fail(cause);
+                ret++;
+            }
+        }
+
+        return ret;
+    }
+
+    boolean isComplete() {
+        return completeCount >= entries.length;
+    }
+
+    void recycle() {
+        for (OutboundQueueEntry e : entries) {
+            e.reset();
+        }
+
+        CACHE.offer(new QueueRef(REF_QUEUE, entries));
+    }
+}