BUG-3219: introduce OutboundQueueException
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
index 5d22435dddf8aaa3f410e22c53c148c9c58bf1f9..f61e8fffa6d86abb58fa1028e425073e01796482 100644 (file)
@@ -8,11 +8,14 @@
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
 import javax.annotation.Nonnull;
+import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,7 +26,6 @@ final class OutboundQueueImpl implements OutboundQueue {
             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
-    private static final long FLUSH_RETRY_NANOS = 1L;
     private final OutboundQueueManager<?> manager;
     private final OutboundQueueEntry[] queue;
     private final long baseXid;
@@ -181,33 +183,42 @@ final class OutboundQueueImpl implements OutboundQueue {
         for (;;) {
             // No message ready
             if (isEmpty()) {
-                LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
+                LOG.trace("Flushed all reserved entries up to ", flushOffset);
                 return null;
             }
 
-            boolean retry = true;
-            while (!queue[flushOffset].isCommitted()) {
-                if (!retry) {
-                    LOG.debug("Offset {} not ready yet, giving up", flushOffset);
-                    return null;
-                }
-
-                LOG.debug("Offset {} not ready yet, retrying", flushOffset);
-                LockSupport.parkNanos(FLUSH_RETRY_NANOS);
-                retry = false;
+            final OutboundQueueEntry entry = queue[flushOffset];
+            if (!entry.isCommitted()) {
+                LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
+                return null;
             }
 
-            final OfHeader msg = queue[flushOffset++].getMessage();
+            final OfHeader msg = entry.getMessage();
+            flushOffset++;
             if (msg != null) {
                 return msg;
             }
+
+            LOG.trace("Null message, skipping to offset {}", flushOffset);
         }
     }
 
-    private boolean xidInRance(final long xid) {
+    // Argument is 'long' to explicitly convert before performing operations
+    private boolean xidInRange(final long xid) {
         return xid < endXid && (xid >= baseXid || baseXid > endXid);
     }
 
+    private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
+        if (response instanceof Error) {
+            final Error err = (Error)response;
+            LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
+            entry.fail(new DeviceRequestFailedException("Device-side failure", err));
+            return true;
+        } else {
+            return entry.complete(response);
+        }
+    }
+
     /**
      * Return the request entry corresponding to a response. Returns null
      * if there is no request matching the response.
@@ -217,7 +228,7 @@ final class OutboundQueueImpl implements OutboundQueue {
      */
     OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
         final Long xid = response.getXid();
-        if (!xidInRance(xid)) {
+        if (!xidInRange(xid)) {
             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
             return null;
         }
@@ -229,16 +240,23 @@ final class OutboundQueueImpl implements OutboundQueue {
             return null;
         }
 
-        if (entry.complete(response)) {
+        if (entry.isBarrier()) {
+            // This has been a barrier -- make sure we complete all preceding requests.
+            // XXX: Barriers are expected to complete in one message.
+            //      If this assumption is changed, this logic will need to be expanded
+            //      to ensure that the requests implied by the barrier are reported as
+            //      completed *after* the barrier.
+            LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
+            completeRequests(offset);
+            lastBarrierOffset = offset;
+
+            final boolean success = completeEntry(entry, response);
+            Verify.verify(success, "Barrier request failed to complete");
+            completeCount++;
+        } else if (completeEntry(entry, response)) {
             completeCount++;
-
-            // This has been a barrier -- make sure we complete all preceding requests
-            if (entry.isBarrier()) {
-                LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
-                completeRequests(offset);
-                lastBarrierOffset = offset;
-            }
         }
+
         return entry;
     }
 
@@ -255,7 +273,7 @@ final class OutboundQueueImpl implements OutboundQueue {
         completeRequests(queue.length);
     }
 
-    int failAll(final Throwable cause) {
+    int failAll(final OutboundQueueException cause) {
         int ret = 0;
         for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
             final OutboundQueueEntry entry = queue[i];