X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FOutboundQueueImpl.java;h=c9c28c16769cdbf4a498bf92f37de0e1ea447971;hb=1ef77ad63ab66eed7dd5404301964db8291004b1;hp=f133082ff76e41ff8455dcc5a607ef4b7bf7c15b;hpb=eea277b72dc9337bb06bb6f2ff470830cc98e74c;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java index f133082f..c9c28c16 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java @@ -8,10 +8,14 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.util.concurrent.FutureCallback; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import javax.annotation.Nonnull; +import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,9 +87,12 @@ final class OutboundQueueImpl implements OutboundQueue { Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid); } + final int ro = reserveOffset; + Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message); + final OutboundQueueEntry entry = queue[offset]; entry.commit(message, callback); - LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset); + LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro); if (entry.isBarrier()) { int my = offset; @@ -131,12 +138,30 @@ final class OutboundQueueImpl implements OutboundQueue { } } + int startShutdown() { + // Increment the offset by the queue size, hence preventing any normal + // allocations. We should not be seeing a barrier reservation after this + // and if there is one issued, we can disregard it. + final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length); + + // If this offset is larger than reserve, trim it. That is not an accurate + // view of which slot was actually "reserved", but it indicates at which + // entry we can declare the queue flushed (e.g. at the emergency slot). + return offset > reserve ? reserve : offset; + } + + boolean isShutdown(final int offset) { + // This queue is shutdown if the flushOffset (e.g. the next entry to + // be flushed) points to the offset 'reserved' in startShutdown() + return flushOffset >= offset; + } + /** * An empty queue is a queue which has no further unflushed entries. * * @return True if this queue does not have unprocessed entries. */ - boolean isEmpty() { + private boolean isEmpty() { int ro = reserveOffset; if (ro >= reserve) { if (queue[reserve].isCommitted()) { @@ -166,7 +191,7 @@ final class OutboundQueueImpl implements OutboundQueue { } boolean isFlushed() { - LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve); + LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve); if (flushOffset < reserve) { return false; } @@ -175,11 +200,28 @@ final class OutboundQueueImpl implements OutboundQueue { return flushOffset >= queue.length || !queue[reserve].isCommitted(); } + boolean needsFlush() { + if (flushOffset < reserve) { + return queue[flushOffset].isCommitted(); + } + + if (isFlushed()) { + LOG.trace("Queue {} is flushed, schedule a replace", this); + return true; + } + if (isFinished()) { + LOG.trace("Queue {} is finished, schedule a cleanup", this); + return true; + } + + return false; + } + OfHeader flushEntry() { for (;;) { // No message ready if (isEmpty()) { - LOG.trace("Flushed all reserved entries up to ", flushOffset); + LOG.trace("Flushed all reserved entries up to {}", flushOffset); return null; } @@ -199,10 +241,22 @@ final class OutboundQueueImpl implements OutboundQueue { } } - private boolean xidInRance(final long xid) { + // Argument is 'long' to explicitly convert before performing operations + private boolean xidInRange(final long xid) { return xid < endXid && (xid >= baseXid || baseXid > endXid); } + private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) { + if (response instanceof Error) { + final Error err = (Error)response; + LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString()); + entry.fail(new DeviceRequestFailedException("Device-side failure", err)); + return true; + } else { + return entry.complete(response); + } + } + /** * Return the request entry corresponding to a response. Returns null * if there is no request matching the response. @@ -212,7 +266,7 @@ final class OutboundQueueImpl implements OutboundQueue { */ OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) { final Long xid = response.getXid(); - if (!xidInRance(xid)) { + if (!xidInRange(xid)) { LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid); return null; } @@ -224,16 +278,23 @@ final class OutboundQueueImpl implements OutboundQueue { return null; } - if (entry.complete(response)) { + if (entry.isBarrier()) { + // This has been a barrier -- make sure we complete all preceding requests. + // XXX: Barriers are expected to complete in one message. + // If this assumption is changed, this logic will need to be expanded + // to ensure that the requests implied by the barrier are reported as + // completed *after* the barrier. + LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1); + completeRequests(offset); + lastBarrierOffset = offset; + + final boolean success = completeEntry(entry, response); + Verify.verify(success, "Barrier request failed to complete"); + completeCount++; + } else if (completeEntry(entry, response)) { completeCount++; - - // This has been a barrier -- make sure we complete all preceding requests - if (entry.isBarrier()) { - LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1); - completeRequests(offset); - lastBarrierOffset = offset; - } } + return entry; } @@ -250,10 +311,14 @@ final class OutboundQueueImpl implements OutboundQueue { completeRequests(queue.length); } - int failAll(final Throwable cause) { + int failAll(final OutboundQueueException cause) { int ret = 0; for (int i = lastBarrierOffset + 1; i < queue.length; ++i) { final OutboundQueueEntry entry = queue[i]; + if (!entry.isCommitted()) { + break; + } + if (!entry.isCompleted()) { entry.fail(cause); ret++;