final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
- final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, OutboundQueueCache.getInstance().getSlice(maxQueueDepth), maxBarrierNanos);
+ final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
outputManager = ret;
channel.pipeline().addLast(outputManager);
+++ /dev/null
-/*
- * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core.connection;
-
-import java.util.HashMap;
-import java.util.Map;
-
-final class OutboundQueueCache {
- private static final OutboundQueueCache INSTANCE = new OutboundQueueCache();
-
- private final Map<Integer, OutboundQueueCacheSlice> slices = new HashMap<>();
-
- private OutboundQueueCache() {
-
- }
-
- static OutboundQueueCache getInstance() {
- return INSTANCE;
- }
-
- synchronized OutboundQueueCacheSlice getSlice(final int queueSize) {
- final OutboundQueueCacheSlice oldSlice = slices.get(queueSize);
- if (oldSlice != null) {
- oldSlice.incRef();
- return oldSlice;
- }
-
- final OutboundQueueCacheSlice newSlice = new OutboundQueueCacheSlice(queueSize);
- slices.put(queueSize, newSlice);
- return newSlice;
- }
-
- synchronized void putSlice(final OutboundQueueCacheSlice slice) {
- if (slice.decRef()) {
- slices.remove(slice.getQueueSize());
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core.connection;
-
-import com.google.common.base.FinalizableReferenceQueue;
-import com.google.common.base.FinalizableSoftReference;
-import com.google.common.base.Preconditions;
-import java.lang.ref.Reference;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class OutboundQueueCacheSlice {
- private static final class QueueRef extends FinalizableSoftReference<OutboundQueueImpl> {
- private final Collection<?> cache;
-
- protected QueueRef(final FinalizableReferenceQueue queue, final Collection<?> cache, final OutboundQueueImpl referent) {
- super(referent, queue);
- this.cache = Preconditions.checkNotNull(cache);
- }
-
- @Override
- public void finalizeReferent() {
- cache.remove(this);
- }
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueCacheSlice.class);
- private final FinalizableReferenceQueue refQueue = new FinalizableReferenceQueue();
-
- private final ConcurrentLinkedDeque<QueueRef> cache = new ConcurrentLinkedDeque<>();
- private final int queueSize;
- private int refCount = 1;
-
- OutboundQueueCacheSlice(final int queueSize) {
- Preconditions.checkArgument(queueSize >= 1);
- this.queueSize = queueSize;
- }
-
- void remove(final QueueRef queueRef) {
- cache.remove(queueRef);
- }
-
- boolean decRef() {
- if (--refCount == 0) {
- refQueue.close();
- return true;
- } else {
- return false;
- }
- }
-
- void incRef() {
- refCount++;
- }
-
- int getQueueSize() {
- return queueSize;
- }
-
- OutboundQueueImpl getQueue(final OutboundQueueManager<?> manager, final long baseXid) {
- final OutboundQueueImpl ret;
- OutboundQueueImpl cached = null;
- for (;;) {
- final Reference<OutboundQueueImpl> item = cache.pollLast();
- if (item == null) {
- break;
- }
-
- cached = item.get();
- if (cached != null) {
- ret = cached.reuse(manager, baseXid);
- LOG.trace("Reusing queue {} as {} on manager {}", cached, ret, manager);
- return ret;
- }
- }
-
- ret = new OutboundQueueImpl(manager, baseXid, queueSize + 1);
- LOG.trace("Allocated new queue {} on manager {}", ret, manager);
- return ret;
- }
-
- void putQueue(final OutboundQueueImpl queue) {
- queue.retire();
- if (cache.offer(new QueueRef(refQueue, cache, queue))) {
- LOG.trace("Saving queue {} for later reuse", queue);
- } else {
- LOG.trace("Queue {} thrown away", queue);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowjava.protocol.impl.core.connection;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import com.google.common.util.concurrent.FutureCallback;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import javax.annotation.Nonnull;
-import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class OutboundQueueImpl implements OutboundQueue {
- private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
- private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
- private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
- private final OutboundQueueManager<?> manager;
- private final OutboundQueueEntry[] queue;
- private final long baseXid;
- private final long endXid;
- private final int reserve;
-
- // Updated concurrently
- private volatile int barrierOffset = -1;
- private volatile int reserveOffset = 0;
-
- // Updated from Netty only
- private int flushOffset;
- private int completeCount;
- private int lastBarrierOffset = -1;
-
- OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
- /*
- * We use the last entry as an emergency should a timeout-triggered
- * flush request race with normal users for the last entry in this
- * queue. In that case the flush request will take the last entry and
- * schedule a flush, which means that we will get around sending the
- * message as soon as the user finishes the reservation.
- */
- Preconditions.checkArgument(maxQueue > 1);
- this.baseXid = baseXid;
- this.endXid = baseXid + maxQueue;
- this.reserve = maxQueue - 1;
- this.manager = Preconditions.checkNotNull(manager);
- queue = new OutboundQueueEntry[maxQueue];
- for (int i = 0; i < maxQueue; ++i) {
- queue[i] = new OutboundQueueEntry();
- }
- }
-
- private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
- this.manager = Preconditions.checkNotNull(manager);
- this.queue = Preconditions.checkNotNull(queue);
- this.baseXid = baseXid;
- this.endXid = baseXid + queue.length;
- this.reserve = queue.length - 1;
- }
-
- void retire() {
- for (OutboundQueueEntry element : queue) {
- element.reset();
- }
- }
-
- OutboundQueueImpl reuse(final OutboundQueueManager<?> manager, final long baseXid) {
- return new OutboundQueueImpl(manager, baseXid, queue);
- }
-
- @Override
- public Long reserveEntry() {
- return reserveEntry(false);
- }
-
- @Override
- public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
- final int offset = (int)(xid - baseXid);
- if (message != null) {
- Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
- }
-
- final int ro = reserveOffset;
- Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
-
- final OutboundQueueEntry entry = queue[offset];
- entry.commit(message, callback);
- LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
-
- if (entry.isBarrier()) {
- int my = offset;
- for (;;) {
- final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
- if (prev < my) {
- LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
- break;
- }
-
- // We have traveled back, recover
- LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
- my = prev;
- }
- }
-
- manager.ensureFlushing(this);
- }
-
- private Long reserveEntry(final boolean forBarrier) {
- final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
- if (offset >= reserve) {
- if (forBarrier) {
- LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
- return endXid;
- } else {
- LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
- return null;
- }
- }
-
- final Long xid = baseXid + offset;
- LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
- return xid;
- }
-
- Long reserveBarrierIfNeeded() {
- final int bo = barrierOffset;
- if (bo >= flushOffset) {
- LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
- return null;
- } else {
- return reserveEntry(true);
- }
- }
-
- int startShutdown() {
- // Increment the offset by the queue size, hence preventing any normal
- // allocations. We should not be seeing a barrier reservation after this
- // and if there is one issued, we can disregard it.
- final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
-
- // If this offset is larger than reserve, trim it. That is not an accurate
- // view of which slot was actually "reserved", but it indicates at which
- // entry we can declare the queue flushed (e.g. at the emergency slot).
- return offset > reserve ? reserve : offset;
- }
-
- boolean isShutdown(final int offset) {
- // This queue is shutdown if the flushOffset (e.g. the next entry to
- // be flushed) points to the offset 'reserved' in startShutdown()
- return flushOffset >= offset;
- }
-
- /**
- * An empty queue is a queue which has no further unflushed entries.
- *
- * @return True if this queue does not have unprocessed entries.
- */
- private boolean isEmpty() {
- int ro = reserveOffset;
- if (ro >= reserve) {
- if (queue[reserve].isCommitted()) {
- ro = reserve + 1;
- } else {
- ro = reserve;
- }
- }
-
- LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
- return ro <= flushOffset;
- }
-
- /**
- * A queue is finished when all of its entries have been completed.
- *
- * @return False if there are any uncompleted requests.
- */
- boolean isFinished() {
- if (completeCount < reserve) {
- return false;
- }
-
- // We need to check if the last entry was used
- final OutboundQueueEntry last = queue[reserve];
- return !last.isCommitted() || last.isCompleted();
- }
-
- boolean isFlushed() {
- LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
- if (flushOffset < reserve) {
- return false;
- }
-
- // flushOffset implied == reserve
- return flushOffset >= queue.length || !queue[reserve].isCommitted();
- }
-
- boolean needsFlush() {
- if (flushOffset < reserve) {
- return queue[flushOffset].isCommitted();
- }
-
- if (isFlushed()) {
- LOG.trace("Queue {} is flushed, schedule a replace", this);
- return true;
- }
- if (isFinished()) {
- LOG.trace("Queue {} is finished, schedule a cleanup", this);
- return true;
- }
-
- return false;
- }
-
- OfHeader flushEntry() {
- for (;;) {
- // No message ready
- if (isEmpty()) {
- LOG.trace("Flushed all reserved entries up to {}", flushOffset);
- return null;
- }
-
- final OutboundQueueEntry entry = queue[flushOffset];
- if (!entry.isCommitted()) {
- LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
- return null;
- }
-
- final OfHeader msg = entry.takeMessage();
- flushOffset++;
- if (msg != null) {
- return msg;
- }
-
- LOG.trace("Null message, skipping to offset {}", flushOffset);
- }
- }
-
- // Argument is 'long' to explicitly convert before performing operations
- private boolean xidInRange(final long xid) {
- return xid < endXid && (xid >= baseXid || baseXid > endXid);
- }
-
- private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
- if (response instanceof Error) {
- final Error err = (Error)response;
- LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
- entry.fail(new DeviceRequestFailedException("Device-side failure", err));
- return true;
- } else {
- return entry.complete(response);
- }
- }
-
- /**
- * Return the request entry corresponding to a response. Returns null
- * if there is no request matching the response.
- *
- * @param response Response message
- * @return Matching request entry, or null if no match is found.
- */
- OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
- final Long xid = response.getXid();
- if (!xidInRange(xid)) {
- LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
- return null;
- }
-
- final int offset = (int)(xid - baseXid);
- final OutboundQueueEntry entry = queue[offset];
- if (entry.isCompleted()) {
- LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
- return null;
- }
-
- if (entry.isBarrier()) {
- // This has been a barrier -- make sure we complete all preceding requests.
- // XXX: Barriers are expected to complete in one message.
- // If this assumption is changed, this logic will need to be expanded
- // to ensure that the requests implied by the barrier are reported as
- // completed *after* the barrier.
- LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
- completeRequests(offset);
- lastBarrierOffset = offset;
-
- final boolean success = completeEntry(entry, response);
- Verify.verify(success, "Barrier request failed to complete");
- completeCount++;
- } else if (completeEntry(entry, response)) {
- completeCount++;
- }
-
- return entry;
- }
-
- private void completeRequests(final int toOffset) {
- for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
- final OutboundQueueEntry entry = queue[i];
- if (!entry.isCompleted() && entry.complete(null)) {
- completeCount++;
- }
- }
- }
-
- void completeAll() {
- completeRequests(queue.length);
- }
-
- int failAll(final OutboundQueueException cause) {
- int ret = 0;
- for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
- final OutboundQueueEntry entry = queue[i];
- if (!entry.isCommitted()) {
- break;
- }
-
- if (!entry.isCompleted()) {
- entry.fail(cause);
- ret++;
- }
- }
-
- return ret;
- }
-}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
- /**
- * This is the default upper bound we place on the flush task running
- * a single iteration. We relinquish control after about this amount
- * of time.
- */
- private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
-
- /**
- * We re-check the time spent flushing every this many messages. We do this because
- * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
- * or similar to disable the feature.
- */
- private static final int WORKTIME_RECHECK_MSGS = 64;
-
/**
* Default low write watermark. Channel will become writable when number of outstanding
* bytes dips below this value.
*/
private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
-
- private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
private final AtomicBoolean flushScheduled = new AtomicBoolean();
+ private final StackedOutboundQueue currentQueue;
private final ConnectionAdapterImpl parent;
private final InetSocketAddress address;
+ private final int maxNonBarrierMessages;
private final long maxBarrierNanos;
- private final long maxWorkTime;
private final T handler;
+ // Accessed concurrently
+ private volatile boolean reading;
+
// Updated from netty only
- private long lastBarrierNanos = System.nanoTime();
- private OutboundQueueCacheSlice slice;
- private OutboundQueueImpl currentQueue;
+ private boolean alreadyReading;
private boolean barrierTimerEnabled;
+ private long lastBarrierNanos = System.nanoTime();
private int nonBarrierMessages;
- private long lastXid = 0;
- private Integer shutdownOffset;
+ private boolean shuttingDown;
// Passed to executor to request triggering of flush
private final Runnable flushRunnable = new Runnable() {
};
OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
- final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
+ final int maxNonBarrierMessages, final long maxBarrierNanos) {
this.parent = Preconditions.checkNotNull(parent);
this.handler = Preconditions.checkNotNull(handler);
- this.slice = Preconditions.checkNotNull(slice);
+ Preconditions.checkArgument(maxNonBarrierMessages > 0);
+ this.maxNonBarrierMessages = maxNonBarrierMessages;
Preconditions.checkArgument(maxBarrierNanos > 0);
this.maxBarrierNanos = maxBarrierNanos;
this.address = address;
- this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
- LOG.debug("Queue manager instantiated with queue slice {}", slice);
- createQueue();
+ currentQueue = new StackedOutboundQueue(this);
+ LOG.debug("Queue manager instantiated with queue {}", currentQueue);
+ handler.onConnectionQueueChanged(currentQueue);
}
T getHandler() {
@Override
public void close() {
handler.onConnectionQueueChanged(null);
- if (slice != null) {
- slice.decRef();
- slice = null;
- }
- }
-
- private void createQueue() {
- final long baseXid = lastXid;
- lastXid += slice.getQueueSize() + 1;
-
- final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
- activeQueues.add(queue);
- currentQueue = queue;
- handler.onConnectionQueueChanged(queue);
}
private void scheduleBarrierTimer(final long now) {
LOG.trace("Barrier XID {} scheduled", xid);
}
- /**
- * Flush an entry from the queue.
- *
- * @param now Time reference for 'now'. We take this as an argument, as
- * we need a timestamp to mark barrier messages we see swinging
- * by. That timestamp does not need to be completely accurate,
- * hence we use the flush start time. Alternative would be to
- * measure System.nanoTime() for each barrier -- needlessly
- * adding overhead.
- *
- * @return Entry which was flushed, null if no entry is ready.
- */
- OfHeader flushEntry(final long now) {
- final OfHeader message = currentQueue.flushEntry();
- if (currentQueue.isFlushed()) {
- LOG.debug("Queue {} is fully flushed", currentQueue);
- createQueue();
- }
-
- if (message == null) {
- return null;
- }
-
- if (message instanceof BarrierInput) {
- LOG.trace("Barrier message seen, resetting counters");
- nonBarrierMessages = 0;
- lastBarrierNanos = now;
- } else {
- nonBarrierMessages++;
- if (nonBarrierMessages >= slice.getQueueSize()) {
- LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
- scheduleBarrierMessage();
- } else if (!barrierTimerEnabled) {
- scheduleBarrierTimer(now);
- }
- }
-
- return message;
- }
/**
* Invoked whenever a message comes in from the switch. Runs matching
boolean onMessage(final OfHeader message) {
LOG.trace("Attempting to pair message {} to a request", message);
- Iterator<OutboundQueueImpl> it = activeQueues.iterator();
- while (it.hasNext()) {
- final OutboundQueueImpl queue = it.next();
- final OutboundQueueEntry entry = queue.pairRequest(message);
-
- if (entry == null) {
- continue;
- }
-
- LOG.trace("Queue {} accepted response {}", queue, message);
-
- // This has been a barrier request, we need to flush all
- // previous queues
- if (entry.isBarrier() && activeQueues.size() > 1) {
- LOG.trace("Queue {} indicated request was a barrier", queue);
-
- it = activeQueues.iterator();
- while (it.hasNext()) {
- final OutboundQueueImpl q = it.next();
-
- // We want to complete all queues before the current one, we will
- // complete the current queue below
- if (!queue.equals(q)) {
- LOG.trace("Queue {} is implied finished", q);
- q.completeAll();
- it.remove();
- slice.putQueue(q);
- } else {
- break;
- }
- }
- }
-
- if (queue.isFinished()) {
- LOG.trace("Queue {} is finished", queue);
- it.remove();
- slice.putQueue(queue);
- }
-
- return true;
- }
-
- LOG.debug("Failed to find completion for message {}", message);
- return false;
+ return currentQueue.pairRequest(message);
}
private void scheduleFlush() {
}
}
- void ensureFlushing(final OutboundQueueImpl queue) {
- Preconditions.checkState(currentQueue.equals(queue));
- scheduleFlush();
- }
-
/**
* Periodic barrier check.
*/
protected void barrier() {
LOG.debug("Channel {} barrier timer expired", parent.getChannel());
barrierTimerEnabled = false;
- if (shutdownOffset != null) {
+ if (shuttingDown) {
LOG.trace("Channel shut down, not processing barrier");
return;
}
conditionalFlush();
}
- private void shutdownFlush() {
- long entries = 0;
-
- // Fail all queues
- final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
- while (it.hasNext()) {
- final OutboundQueueImpl queue = it.next();
+ private void writeAndFlush() {
+ final long start = System.nanoTime();
- entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
- if (queue.isFinished()) {
- LOG.trace("Cleared queue {}", queue);
- it.remove();
- }
+ final int entries = currentQueue.writeEntries(parent.getChannel(), start);
+ if (entries > 0) {
+ LOG.debug("Flushing channel {}", parent.getChannel());
+ parent.getChannel().flush();
}
- LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
-
- Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
- if (currentQueue.isShutdown(shutdownOffset)) {
- currentQueue = null;
- handler.onConnectionQueueChanged(null);
- LOG.debug("Channel {} shutdown complete", parent.getChannel());
- } else {
- LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
- rescheduleFlush();
+ if (LOG.isDebugEnabled()) {
+ final long stop = System.nanoTime();
+ LOG.debug("Flushed {} messages to channel {} in {}us", entries,
+ parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
}
}
*/
protected void flush() {
// If the channel is gone, just flush whatever is not completed
- if (shutdownOffset != null) {
- shutdownFlush();
- return;
- }
-
- final long start = System.nanoTime();
- final long deadline = start + maxWorkTime;
-
- LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
-
- long messages = 0;
- for (;; ++messages) {
- if (!parent.getChannel().isWritable()) {
- LOG.debug("Channel {} is no longer writable", parent.getChannel());
- break;
- }
-
- final OfHeader message = flushEntry(start);
- if (message == null) {
- LOG.trace("The queue is completely drained");
- break;
- }
-
- final Object wrapper;
- if (address == null) {
- wrapper = new MessageListenerWrapper(message, null);
- } else {
- wrapper = new UdpMessageListenerWrapper(message, null, address);
- }
- parent.getChannel().write(wrapper);
-
- /*
- * Check every WORKTIME_RECHECK_MSGS for exceeded time.
- *
- * XXX: given we already measure our flushing throughput, we
- * should be able to perform dynamic adjustments here.
- * is that additional complexity needed, though?
- */
- if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
- LOG.trace("Exceeded allotted work time {}us",
- TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
- break;
- }
- }
-
- if (messages > 0) {
- LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
- parent.getChannel().flush();
+ if (!shuttingDown) {
+ LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
+ writeAndFlush();
+ rescheduleFlush();
+ } else if (currentQueue.finishShutdown()) {
+ handler.onConnectionQueueChanged(null);
+ LOG.debug("Channel {} shutdown complete", parent.getChannel());
+ } else {
+ LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+ rescheduleFlush();
}
-
- final long stop = System.nanoTime();
- LOG.debug("Flushed {} messages in {}us to channel {}",
- messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
-
- rescheduleFlush();
}
/**
*/
private void conditionalFlush() {
if (currentQueue.needsFlush()) {
- if (shutdownOffset != null || parent.getChannel().isWritable()) {
+ if (shuttingDown || parent.getChannel().isWritable()) {
scheduleFlush();
} else {
LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
}
}
- private void conditionalFlush(final ChannelHandlerContext ctx) {
- Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
- conditionalFlush();
- }
-
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
- conditionalFlush(ctx);
+ conditionalFlush();
}
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
+ // A simple trade-off. While we could write things right away, if there is a task
+ // schedule, let it have the work
if (flushScheduled.compareAndSet(false, true)) {
LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
flush();
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
- LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
+ LOG.debug("Channel {} initiating shutdown...", ctx.channel());
- /*
- * We are dealing with a multi-threaded shutdown, as the user may still
- * be reserving entries in the queue. We are executing in a netty thread,
- * so neither flush nor barrier can be running, which is good news.
- *
- * We will eat up all the slots in the queue here and mark the offset first
- * reserved offset and free up all the cached queues. We then schedule
- * the flush task, which will deal with the rest of the shutdown process.
- */
- shutdownOffset = currentQueue.startShutdown();
- if (slice != null) {
- slice.decRef();
- slice = null;
- }
+ shuttingDown = true;
+ final long entries = currentQueue.startShutdown(ctx.channel());
+ LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
- LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
scheduleFlush();
}
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+ // non-volatile read if we are called multiple times
+ if (!alreadyReading) {
+ alreadyReading = true;
+ reading = true;
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
+ super.channelReadComplete(ctx);
+ alreadyReading = false;
+ reading = false;
+
+ // TODO: model this as an atomic gate. We need to sync on it to make sure
+ // that ensureFlushing() suppresses scheudling only if this barrier
+ // has not been crossed.
+ synchronized (this) {
+ // Run flush regardless of writability. This is not strictly required, as
+ // there may be a scheduled flush. Instead of canceling it, which is expensive,
+ // we'll steal its work. Note that more work may accumulate in the time window
+ // between now and when the task will run, so it may not be a no-op after all.
+ //
+ // The reason for this is to will the output buffer before we go into selection
+ // phase. This will make sure the pipe is full (in which case our next wake up
+ // will be the queue becoming writable).
+ writeAndFlush();
+ }
+
+ LOG.debug("Opportunistic write on channel {}", parent.getChannel());
+ writeAndFlush();
+ }
+
@Override
public String toString() {
return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
}
+ void ensureFlushing() {
+ // If the channel is not writable, there's no point in waking up,
+ // once we become writable, we will run a full flush
+ if (!parent.getChannel().isWritable()) {
+ return;
+ }
+
+ // We are currently reading something, just a quick sync to ensure we will in fact
+ // flush state.
+ if (reading) {
+ synchronized (this) {
+ if (reading) {
+ return;
+ }
+ }
+ }
+
+ // Netty thread is outside our code, we need to schedule a flush
+ // to re-synchronize.
+ scheduleFlush();
+ }
+
void onEchoRequest(final EchoRequestMessage message) {
final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
parent.getChannel().writeAndFlush(reply);
}
+
+ /**
+ * Write a message into the underlying channel.
+ *
+ * @param now Time reference for 'now'. We take this as an argument, as
+ * we need a timestamp to mark barrier messages we see swinging
+ * by. That timestamp does not need to be completely accurate,
+ * hence we use the flush start time. Alternative would be to
+ * measure System.nanoTime() for each barrier -- needlessly
+ * adding overhead.
+ */
+ void writeMessage(final OfHeader message, final long now) {
+ final Object wrapper;
+ if (address == null) {
+ wrapper = new MessageListenerWrapper(message, null);
+ } else {
+ wrapper = new UdpMessageListenerWrapper(message, null, address);
+ }
+ parent.getChannel().write(wrapper);
+
+ if (message instanceof BarrierInput) {
+ LOG.trace("Barrier message seen, resetting counters");
+ nonBarrierMessages = 0;
+ lastBarrierNanos = now;
+ } else {
+ nonBarrierMessages++;
+ if (nonBarrierMessages >= maxNonBarrierMessages) {
+ LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+ scheduleBarrierMessage();
+ } else if (!barrierTimerEnabled) {
+ scheduleBarrierTimer(now);
+ }
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.FutureCallback;
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StackedOutboundQueue implements OutboundQueue {
+ private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
+ private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
+ private static final AtomicLongFieldUpdater<StackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid");
+
+ @GuardedBy("unflushedSegments")
+ private volatile StackedSegment firstSegment;
+ @GuardedBy("unflushedSegments")
+ private final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
+ @GuardedBy("unflushedSegments")
+ private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
+ private final OutboundQueueManager<?> manager;
+
+ private volatile long lastXid = -1;
+ private volatile long barrierXid = -1;
+
+ @GuardedBy("unflushedSegments")
+ private Integer shutdownOffset;
+
+ // Accessed from Netty only
+ private int flushOffset;
+
+ StackedOutboundQueue(final OutboundQueueManager<?> manager) {
+ this.manager = Preconditions.checkNotNull(manager);
+ firstSegment = StackedSegment.create(0L);
+ uncompletedSegments.add(firstSegment);
+ unflushedSegments.add(firstSegment);
+ }
+
+ @GuardedBy("unflushedSegments")
+ private void ensureSegment(final StackedSegment first, final int offset) {
+ final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
+ LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size());
+
+ for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) {
+ final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i));
+ LOG.debug("Adding segment {}", newSegment);
+ unflushedSegments.add(newSegment);
+ }
+ }
+
+ /*
+ * This method is expected to be called from multiple threads concurrently.
+ */
+ @Override
+ public Long reserveEntry() {
+ final long xid = LAST_XID_UPDATER.incrementAndGet(this);
+ final StackedSegment fastSegment = firstSegment;
+
+ if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) {
+ LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
+
+ // Multiple segments, this a slow path
+ synchronized (unflushedSegments) {
+ LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
+
+ // Shutdown was scheduled, need to fail the reservation
+ if (shutdownOffset != null) {
+ LOG.debug("Queue {} is being shutdown, failing reservation", this);
+ return null;
+ }
+
+ // Ensure we have the appropriate segment for the specified XID
+ final StackedSegment slowSegment = firstSegment;
+ final int slowOffset = (int) (xid - slowSegment.getBaseXid());
+ Verify.verify(slowOffset >= 0);
+
+ // Now, we let's see if we need to allocate a new segment
+ ensureSegment(slowSegment, slowOffset);
+
+ LOG.debug("Queue {} slow reservation finished", this);
+ }
+ }
+
+ LOG.trace("Queue {} allocated XID {}", this, xid);
+ return xid;
+ }
+
+ /*
+ * This method is expected to be called from multiple threads concurrently
+ */
+ @Override
+ public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+ final StackedSegment fastSegment = firstSegment;
+ final long calcOffset = xid - fastSegment.getBaseXid();
+ Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
+
+ Verify.verify(calcOffset <= Integer.MAX_VALUE);
+ final int fastOffset = (int) calcOffset;
+
+ final OutboundQueueEntry entry;
+ if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
+ LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
+
+ final StackedSegment segment;
+ final int slowOffset;
+ synchronized (unflushedSegments) {
+ final StackedSegment slowSegment = firstSegment;
+ final long slowCalcOffset = xid - slowSegment.getBaseXid();
+ Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
+ slowOffset = (int) slowCalcOffset;
+
+ LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
+ segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
+ }
+
+ final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
+ entry = segment.getEntry(segOffset);
+ LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
+ } else {
+ entry = fastSegment.getEntry(fastOffset);
+ }
+
+ entry.commit(message, callback);
+ if (entry.isBarrier()) {
+ long my = xid;
+ for (;;) {
+ final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
+ if (prev < my) {
+ LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
+ break;
+ }
+
+ // We have traveled back, recover
+ LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
+ my = prev;
+ }
+ }
+
+ LOG.trace("Queue {} committed XID {}", this, xid);
+ manager.ensureFlushing();
+ }
+
+ /**
+ * Write some entries from the queue to the channel. Guaranteed to run
+ * in the corresponding EventLoop.
+ *
+ * @param channel Channel onto which we are writing
+ * @param now
+ * @return Number of entries written out
+ */
+ int writeEntries(@Nonnull final Channel channel, final long now) {
+ // Local cache
+ StackedSegment segment = firstSegment;
+ int entries = 0;
+
+ while (channel.isWritable()) {
+ final OutboundQueueEntry entry = segment.getEntry(flushOffset);
+ if (!entry.isCommitted()) {
+ LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset);
+ break;
+ }
+
+ LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset);
+ final OfHeader message = entry.takeMessage();
+ flushOffset++;
+ entries++;
+
+ if (message != null) {
+ manager.writeMessage(message, now);
+ } else {
+ entry.complete(null);
+ }
+
+ if (flushOffset >= StackedSegment.SEGMENT_SIZE) {
+ /*
+ * Slow path: purge the current segment unless it's the last one.
+ * If it is, we leave it for replacement when a new reservation
+ * is run on it.
+ *
+ * This costs us two slow paths, but hey, this should be very rare,
+ * so let's keep things simple.
+ */
+ synchronized (unflushedSegments) {
+ LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size());
+
+ // We may have raced ahead of reservation code and need to allocate a segment
+ ensureSegment(segment, flushOffset);
+
+ // Remove the segment, update the firstSegment and reset flushOffset
+ final StackedSegment oldSegment = unflushedSegments.remove(0);
+ if (oldSegment.isComplete()) {
+ uncompletedSegments.remove(oldSegment);
+ oldSegment.recycle();
+ }
+
+ // Reset the first segment and add it to the uncompleted list
+ segment = unflushedSegments.get(0);
+ uncompletedSegments.add(segment);
+
+ // Update the shutdown offset
+ if (shutdownOffset != null) {
+ shutdownOffset -= StackedSegment.SEGMENT_SIZE;
+ }
+
+ // Allow reservations back on the fast path by publishing the new first segment
+ firstSegment = segment;
+
+ flushOffset = 0;
+ LOG.debug("Queue {} flush moved to segment {}", this, segment);
+ }
+ }
+ }
+
+ return entries;
+ }
+
+ Long reserveBarrierIfNeeded() {
+ final long bXid = barrierXid;
+ final long fXid = firstSegment.getBaseXid() + flushOffset;
+ if (bXid >= fXid) {
+ LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
+ return null;
+ } else {
+ return reserveEntry();
+ }
+ }
+
+ boolean pairRequest(final OfHeader message) {
+ Iterator<StackedSegment> it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment queue = it.next();
+ final OutboundQueueEntry entry = queue.pairRequest(message);
+ if (entry == null) {
+ continue;
+ }
+
+ LOG.trace("Queue {} accepted response {}", queue, message);
+
+ // This has been a barrier request, we need to flush all
+ // previous queues
+ if (entry.isBarrier() && uncompletedSegments.size() > 1) {
+ LOG.trace("Queue {} indicated request was a barrier", queue);
+
+ it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment q = it.next();
+
+ // We want to complete all queues before the current one, we will
+ // complete the current queue below
+ if (!queue.equals(q)) {
+ LOG.trace("Queue {} is implied finished", q);
+ q.completeAll();
+ it.remove();
+ q.recycle();
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (queue.isComplete()) {
+ LOG.trace("Queue {} is finished", queue);
+ it.remove();
+ queue.recycle();
+ }
+
+ return true;
+ }
+
+ LOG.debug("Failed to find completion for message {}", message);
+ return false;
+ }
+
+ long startShutdown(final Channel channel) {
+ /*
+ * We are dealing with a multi-threaded shutdown, as the user may still
+ * be reserving entries in the queue. We are executing in a netty thread,
+ * so neither flush nor barrier can be running, which is good news.
+ *
+ * We will eat up all the slots in the queue here and mark the offset first
+ * reserved offset and free up all the cached queues. We then schedule
+ * the flush task, which will deal with the rest of the shutdown process.
+ */
+ synchronized (unflushedSegments) {
+ // Increment the offset by the segment size, preventing fast path allocations,
+ // since we are holding the slow path lock, any reservations will see the queue
+ // in shutdown and fail accordingly.
+ final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
+ shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
+
+ return lockedShutdownFlush();
+ }
+ }
+
+ @GuardedBy("unflushedSegments")
+ private long lockedShutdownFlush() {
+ long entries = 0;
+
+ // Fail all queues
+ final Iterator<StackedSegment> it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment segment = it.next();
+
+ entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+ if (segment.isComplete()) {
+ LOG.trace("Cleared segment {}", segment);
+ it.remove();
+ }
+ }
+
+ return entries;
+ }
+
+ boolean finishShutdown() {
+ synchronized (unflushedSegments) {
+ lockedShutdownFlush();
+ }
+
+ return !needsFlush();
+ }
+
+ boolean needsFlush() {
+ // flushOffset always points to the first entry, which can be changed only
+ // from Netty, so we are fine here.
+ if (firstSegment.getBaseXid() + flushOffset > lastXid) {
+ return false;
+ }
+
+ if (shutdownOffset != null && flushOffset >= shutdownOffset) {
+ return false;
+ }
+
+ return firstSegment.getEntry(flushOffset).isCommitted();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.FinalizableReferenceQueue;
+import com.google.common.base.FinalizableSoftReference;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import java.lang.ref.Reference;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StackedSegment {
+ private static final class QueueRef extends FinalizableSoftReference<OutboundQueueEntry[]> {
+ QueueRef(final FinalizableReferenceQueue queue, final OutboundQueueEntry[] referent) {
+ super(referent, queue);
+ }
+
+ @Override
+ public void finalizeReferent() {
+ CACHE.remove(this);
+ }
+ }
+
+ /**
+ * Size of each individual segment
+ */
+ static final int SEGMENT_SIZE = 4096;
+
+ private static final Logger LOG = LoggerFactory.getLogger(StackedSegment.class);
+ private static final FinalizableReferenceQueue REF_QUEUE = new FinalizableReferenceQueue();
+ private static final ConcurrentLinkedDeque<QueueRef> CACHE = new ConcurrentLinkedDeque<>();
+
+ private final OutboundQueueEntry[] entries;
+ private final long baseXid;
+ private final long endXid;
+
+ // Updated from netty only
+ private int lastBarrierOffset = -1;
+ private int completeCount;
+
+ StackedSegment(final long baseXid, final OutboundQueueEntry[] entries) {
+ this.baseXid = baseXid;
+ this.endXid = baseXid + SEGMENT_SIZE;
+ this.entries = Preconditions.checkNotNull(entries);
+ }
+
+ static StackedSegment create(final long baseXid) {
+ final StackedSegment ret;
+ for (;;) {
+ final Reference<OutboundQueueEntry[]> item = CACHE.pollLast();
+ if (item == null) {
+ break;
+ }
+
+ final OutboundQueueEntry[] cached = item.get();
+ if (cached != null) {
+ ret = new StackedSegment(baseXid, cached);
+ LOG.trace("Reusing array {} in segment {}", cached, ret);
+ return ret;
+ }
+ }
+
+ final OutboundQueueEntry[] entries = new OutboundQueueEntry[SEGMENT_SIZE];
+ for (int i = 0; i < SEGMENT_SIZE; ++i) {
+ entries[i] = new OutboundQueueEntry();
+ }
+
+ ret = new StackedSegment(baseXid, entries);
+ LOG.trace("Allocated new segment {}", ret);
+ return ret;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid).add("completeCount", completeCount).toString();
+ }
+
+ long getBaseXid() {
+ return baseXid;
+ }
+
+ OutboundQueueEntry getEntry(final int offset) {
+ return entries[offset];
+ }
+
+ private boolean xidInRange(final long xid) {
+ return xid < endXid && (xid >= baseXid || baseXid > endXid);
+ }
+
+ private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
+ if (response instanceof Error) {
+ final Error err = (Error)response;
+ LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
+ entry.fail(new DeviceRequestFailedException("Device-side failure", err));
+ return true;
+ } else {
+ return entry.complete(response);
+ }
+ }
+
+ OutboundQueueEntry pairRequest(final OfHeader response) {
+ // Explicitly 'long' to force unboxing before performing operations
+ final long xid = response.getXid();
+ if (!xidInRange(xid)) {
+ LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
+ return null;
+ }
+
+ final int offset = (int)(xid - baseXid);
+ final OutboundQueueEntry entry = entries[offset];
+ if (entry.isCompleted()) {
+ LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
+ return null;
+ }
+
+ if (entry.isBarrier()) {
+ // This has been a barrier -- make sure we complete all preceding requests.
+ // XXX: Barriers are expected to complete in one message.
+ // If this assumption is changed, this logic will need to be expanded
+ // to ensure that the requests implied by the barrier are reported as
+ // completed *after* the barrier.
+ LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
+ completeRequests(offset);
+ lastBarrierOffset = offset;
+
+ final boolean success = completeEntry(entry, response);
+ Verify.verify(success, "Barrier request failed to complete");
+ completeCount++;
+ } else if (completeEntry(entry, response)) {
+ completeCount++;
+ }
+
+ return entry;
+ }
+
+ private void completeRequests(final int toOffset) {
+ for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
+ final OutboundQueueEntry entry = entries[i];
+ if (!entry.isCompleted() && entry.complete(null)) {
+ completeCount++;
+ }
+ }
+ }
+
+ void completeAll() {
+ completeRequests(entries.length);
+ }
+
+ int failAll(final OutboundQueueException cause) {
+ int ret = 0;
+ for (int i = lastBarrierOffset + 1; i < entries.length; ++i) {
+ final OutboundQueueEntry entry = entries[i];
+ if (!entry.isCommitted()) {
+ break;
+ }
+
+ if (!entry.isCompleted()) {
+ entry.fail(cause);
+ ret++;
+ }
+ }
+
+ return ret;
+ }
+
+ boolean isComplete() {
+ return completeCount >= entries.length;
+ }
+
+ void recycle() {
+ for (OutboundQueueEntry e : entries) {
+ e.reset();
+ }
+
+ CACHE.offer(new QueueRef(REF_QUEUE, entries));
+ }
+}