BUG-3219: Fix OutboundQueue cleanup on channel failure
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
index f133082ff76e41ff8455dcc5a607ef4b7bf7c15b..6713d5ceab000eb5df956833381dfd4af640674e 100644 (file)
@@ -8,10 +8,14 @@
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import javax.annotation.Nonnull;
+import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -199,10 +203,22 @@ final class OutboundQueueImpl implements OutboundQueue {
         }
     }
 
-    private boolean xidInRance(final long xid) {
+    // Argument is 'long' to explicitly convert before performing operations
+    private boolean xidInRange(final long xid) {
         return xid < endXid && (xid >= baseXid || baseXid > endXid);
     }
 
+    private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
+        if (response instanceof Error) {
+            final Error err = (Error)response;
+            LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
+            entry.fail(new DeviceRequestFailedException("Device-side failure", err));
+            return true;
+        } else {
+            return entry.complete(response);
+        }
+    }
+
     /**
      * Return the request entry corresponding to a response. Returns null
      * if there is no request matching the response.
@@ -212,7 +228,7 @@ final class OutboundQueueImpl implements OutboundQueue {
      */
     OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
         final Long xid = response.getXid();
-        if (!xidInRance(xid)) {
+        if (!xidInRange(xid)) {
             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
             return null;
         }
@@ -224,16 +240,23 @@ final class OutboundQueueImpl implements OutboundQueue {
             return null;
         }
 
-        if (entry.complete(response)) {
+        if (entry.isBarrier()) {
+            // This has been a barrier -- make sure we complete all preceding requests.
+            // XXX: Barriers are expected to complete in one message.
+            //      If this assumption is changed, this logic will need to be expanded
+            //      to ensure that the requests implied by the barrier are reported as
+            //      completed *after* the barrier.
+            LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
+            completeRequests(offset);
+            lastBarrierOffset = offset;
+
+            final boolean success = completeEntry(entry, response);
+            Verify.verify(success, "Barrier request failed to complete");
+            completeCount++;
+        } else if (completeEntry(entry, response)) {
             completeCount++;
-
-            // This has been a barrier -- make sure we complete all preceding requests
-            if (entry.isBarrier()) {
-                LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
-                completeRequests(offset);
-                lastBarrierOffset = offset;
-            }
         }
+
         return entry;
     }
 
@@ -250,10 +273,14 @@ final class OutboundQueueImpl implements OutboundQueue {
         completeRequests(queue.length);
     }
 
-    int failAll(final Throwable cause) {
+    int failAll(final OutboundQueueException cause) {
         int ret = 0;
         for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
             final OutboundQueueEntry entry = queue[i];
+            if (!entry.isCommitted()) {
+                break;
+            }
+
             if (!entry.isCompleted()) {
                 entry.fail(cause);
                 ret++;