package org.opendaylight.openflowjava.protocol.impl.core.connection;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import com.google.common.util.concurrent.FutureCallback;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
import javax.annotation.Nonnull;
+import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
- private static final long FLUSH_RETRY_NANOS = 1L;
+ private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
private final OutboundQueueManager<?> manager;
private final OutboundQueueEntry[] queue;
private final long baseXid;
private final int reserve;
// Updated concurrently
- private volatile int reserveOffset;
+ private volatile int barrierOffset = -1;
+ private volatile int reserveOffset = 0;
// Updated from Netty only
private int flushOffset;
private int completeCount;
+ private int lastBarrierOffset = -1;
OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
/*
this.baseXid = baseXid;
this.endXid = baseXid + queue.length;
this.reserve = queue.length - 1;
+ }
+
+ void retire() {
for (OutboundQueueEntry element : queue) {
element.reset();
}
return new OutboundQueueImpl(manager, baseXid, queue);
}
- Long reserveEntry(final boolean forBarrier) {
+ @Override
+ public Long reserveEntry() {
+ return reserveEntry(false);
+ }
+
+ @Override
+ public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+ final int offset = (int)(xid - baseXid);
+ if (message != null) {
+ Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+ }
+
+ final int ro = reserveOffset;
+ Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
+
+ final OutboundQueueEntry entry = queue[offset];
+ entry.commit(message, callback);
+ LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
+
+ if (entry.isBarrier()) {
+ int my = offset;
+ for (;;) {
+ final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
+ if (prev < my) {
+ LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
+ break;
+ }
+
+ // We have traveled back, recover
+ my = prev;
+ }
+ }
+
+ manager.ensureFlushing(this);
+ }
+
+ private Long reserveEntry(final boolean forBarrier) {
final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
if (offset >= reserve) {
if (forBarrier) {
return xid;
}
- @Override
- public Long reserveEntry() {
- return reserveEntry(false);
+ Long reserveBarrierIfNeeded() {
+ final int bo = barrierOffset;
+ if (bo >= flushOffset) {
+ LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
+ return null;
+ } else {
+ return reserveEntry(true);
+ }
}
- @Override
- public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
- final int offset = (int)(xid - baseXid);
- if (message != null) {
- Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
- }
+ int startShutdown() {
+ // Increment the offset by the queue size, hence preventing any normal
+ // allocations. We should not be seeing a barrier reservation after this
+ // and if there is one issued, we can disregard it.
+ final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
- queue[offset].commit(message, callback);
- LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+ // If this offset is larger than reserve, trim it. That is not an accurate
+ // view of which slot was actually "reserved", but it indicates at which
+ // entry we can declare the queue flushed (e.g. at the emergency slot).
+ return offset > reserve ? reserve : offset;
+ }
- manager.ensureFlushing(this);
+ boolean isShutdown(final int offset) {
+ // This queue is shutdown if the flushOffset (e.g. the next entry to
+ // be flushed) points to the offset 'reserved' in startShutdown()
+ return flushOffset >= offset;
}
/**
*
* @return True if this queue does not have unprocessed entries.
*/
- boolean isEmpty() {
+ private boolean isEmpty() {
int ro = reserveOffset;
if (ro >= reserve) {
if (queue[reserve].isCommitted()) {
}
boolean isFlushed() {
- LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
+ LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
if (flushOffset < reserve) {
return false;
}
return flushOffset >= queue.length || !queue[reserve].isCommitted();
}
+ boolean needsFlush() {
+ if (flushOffset < reserve) {
+ return queue[flushOffset].isCommitted();
+ }
+
+ if (isFlushed()) {
+ LOG.trace("Queue {} is flushed, schedule a replace", this);
+ return true;
+ }
+ if (isFinished()) {
+ LOG.trace("Queue {} is finished, schedule a cleanup", this);
+ return true;
+ }
+
+ return false;
+ }
+
OfHeader flushEntry() {
for (;;) {
// No message ready
if (isEmpty()) {
- LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
+ LOG.trace("Flushed all reserved entries up to {}", flushOffset);
return null;
}
- boolean retry = true;
- while (!queue[flushOffset].isCommitted()) {
- if (!retry) {
- LOG.debug("Offset {} not ready yet, giving up", flushOffset);
- return null;
- }
-
- LOG.debug("Offset {} not ready yet, retrying", flushOffset);
- LockSupport.parkNanos(FLUSH_RETRY_NANOS);
- retry = false;
+ final OutboundQueueEntry entry = queue[flushOffset];
+ if (!entry.isCommitted()) {
+ LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
+ return null;
}
- final OfHeader msg = queue[flushOffset++].getMessage();
+ final OfHeader msg = entry.takeMessage();
+ flushOffset++;
if (msg != null) {
return msg;
}
+
+ LOG.trace("Null message, skipping to offset {}", flushOffset);
}
}
- private boolean xidInRance(final long xid) {
+ // Argument is 'long' to explicitly convert before performing operations
+ private boolean xidInRange(final long xid) {
return xid < endXid && (xid >= baseXid || baseXid > endXid);
}
+ private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
+ if (response instanceof Error) {
+ final Error err = (Error)response;
+ LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
+ entry.fail(new DeviceRequestFailedException("Device-side failure", err));
+ return true;
+ } else {
+ return entry.complete(response);
+ }
+ }
+
/**
* Return the request entry corresponding to a response. Returns null
* if there is no request matching the response.
*/
OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
final Long xid = response.getXid();
- if (!xidInRance(xid)) {
+ if (!xidInRange(xid)) {
LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
return null;
}
return null;
}
- if (entry.complete(response)) {
- completeCount++;
+ if (entry.isBarrier()) {
+ // This has been a barrier -- make sure we complete all preceding requests.
+ // XXX: Barriers are expected to complete in one message.
+ // If this assumption is changed, this logic will need to be expanded
+ // to ensure that the requests implied by the barrier are reported as
+ // completed *after* the barrier.
+ LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
+ completeRequests(offset);
+ lastBarrierOffset = offset;
- // This has been a barrier -- make sure we complete all preceding requests
- if (entry.isBarrier()) {
- LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1);
- for (int i = 0; i < offset; ++i) {
- final OutboundQueueEntry e = queue[i];
- if (!e.isCompleted() && e.complete(null)) {
- completeCount++;
- }
- }
- }
+ final boolean success = completeEntry(entry, response);
+ Verify.verify(success, "Barrier request failed to complete");
+ completeCount++;
+ } else if (completeEntry(entry, response)) {
+ completeCount++;
}
+
return entry;
}
- void completeAll() {
- for (OutboundQueueEntry entry : queue) {
+ private void completeRequests(final int toOffset) {
+ for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
+ final OutboundQueueEntry entry = queue[i];
if (!entry.isCompleted() && entry.complete(null)) {
completeCount++;
}
}
}
- int failAll(final Throwable cause) {
+ void completeAll() {
+ completeRequests(queue.length);
+ }
+
+ int failAll(final OutboundQueueException cause) {
int ret = 0;
- for (OutboundQueueEntry entry : queue) {
+ for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
+ final OutboundQueueEntry entry = queue[i];
+ if (!entry.isCommitted()) {
+ break;
+ }
+
if (!entry.isCompleted()) {
entry.fail(cause);
ret++;