Run flush immediately when channel becomes writable
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
index 1bb9ec4bc1cfaf4c000709bc47d9c43b76a48b7a..c4feb26af88c3680770cf1e311fb7c79e4f67776 100644 (file)
@@ -8,11 +8,14 @@
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
 import javax.annotation.Nonnull;
+import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -21,7 +24,8 @@ final class OutboundQueueImpl implements OutboundQueue {
     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
-    private static final long FLUSH_RETRY_NANOS = 1L;
+    private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
     private final OutboundQueueManager<?> manager;
     private final OutboundQueueEntry[] queue;
     private final long baseXid;
@@ -29,11 +33,13 @@ final class OutboundQueueImpl implements OutboundQueue {
     private final int reserve;
 
     // Updated concurrently
-    private volatile int reserveOffset;
+    private volatile int barrierOffset = -1;
+    private volatile int reserveOffset = 0;
 
     // Updated from Netty only
     private int flushOffset;
     private int completeCount;
+    private int lastBarrierOffset = -1;
 
     OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
         /*
@@ -60,16 +66,56 @@ final class OutboundQueueImpl implements OutboundQueue {
         this.baseXid = baseXid;
         this.endXid = baseXid + queue.length;
         this.reserve = queue.length - 1;
+    }
+
+    void retire() {
         for (OutboundQueueEntry element : queue) {
             element.reset();
         }
     }
 
-    OutboundQueueImpl reuse(final long baseXid) {
+    OutboundQueueImpl reuse(final OutboundQueueManager<?> manager, final long baseXid) {
         return new OutboundQueueImpl(manager, baseXid, queue);
     }
 
-    Long reserveEntry(final boolean forBarrier) {
+    @Override
+    public Long reserveEntry() {
+        return reserveEntry(false);
+    }
+
+    @Override
+    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+        final int offset = (int)(xid - baseXid);
+        if (message != null) {
+            Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+        }
+
+        final int ro = reserveOffset;
+        Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
+
+        final OutboundQueueEntry entry = queue[offset];
+        entry.commit(message, callback);
+        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
+
+        if (entry.isBarrier()) {
+            int my = offset;
+            for (;;) {
+                final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
+                if (prev < my) {
+                    LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
+                    break;
+                }
+
+                // We have traveled back, recover
+                LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
+                my = prev;
+            }
+        }
+
+        manager.ensureFlushing(this);
+    }
+
+    private Long reserveEntry(final boolean forBarrier) {
         final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
         if (offset >= reserve) {
             if (forBarrier) {
@@ -86,22 +132,32 @@ final class OutboundQueueImpl implements OutboundQueue {
         return xid;
     }
 
-    @Override
-    public Long reserveEntry() {
-        return reserveEntry(false);
+    Long reserveBarrierIfNeeded() {
+        final int bo = barrierOffset;
+        if (bo >= flushOffset) {
+            LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
+            return null;
+        } else {
+            return reserveEntry(true);
+        }
     }
 
-    @Override
-    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
-        final int offset = (int)(xid - baseXid);
-        if (message != null) {
-            Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
-        }
+    int startShutdown() {
+        // Increment the offset by the queue size, hence preventing any normal
+        // allocations. We should not be seeing a barrier reservation after this
+        // and if there is one issued, we can disregard it.
+        final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
 
-        queue[offset].commit(message, callback);
-        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+        // If this offset is larger than reserve, trim it. That is not an accurate
+        // view of which slot was actually "reserved", but it indicates at which
+        // entry we can declare the queue flushed (e.g. at the emergency slot).
+        return offset > reserve ? reserve : offset;
+    }
 
-        manager.ensureFlushing(this);
+    boolean isShutdown(final int offset) {
+        // This queue is shutdown if the flushOffset (e.g. the next entry to
+        // be flushed) points to the offset 'reserved' in startShutdown()
+        return flushOffset >= offset;
     }
 
     /**
@@ -109,7 +165,7 @@ final class OutboundQueueImpl implements OutboundQueue {
      *
      * @return True if this queue does not have unprocessed entries.
      */
-    boolean isEmpty() {
+    private boolean isEmpty() {
         int ro = reserveOffset;
         if (ro >= reserve) {
             if (queue[reserve].isCommitted()) {
@@ -139,7 +195,7 @@ final class OutboundQueueImpl implements OutboundQueue {
     }
 
     boolean isFlushed() {
-        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
+        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
         if (flushOffset < reserve) {
             return false;
         }
@@ -148,37 +204,63 @@ final class OutboundQueueImpl implements OutboundQueue {
         return flushOffset >= queue.length || !queue[reserve].isCommitted();
     }
 
+    boolean needsFlush() {
+        if (flushOffset < reserve) {
+            return queue[flushOffset].isCommitted();
+        }
+
+        if (isFlushed()) {
+            LOG.trace("Queue {} is flushed, schedule a replace", this);
+            return true;
+        }
+        if (isFinished()) {
+            LOG.trace("Queue {} is finished, schedule a cleanup", this);
+            return true;
+        }
+
+        return false;
+    }
+
     OfHeader flushEntry() {
         for (;;) {
             // No message ready
             if (isEmpty()) {
-                LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
+                LOG.trace("Flushed all reserved entries up to {}", flushOffset);
                 return null;
             }
 
-            boolean retry = true;
-            while (!queue[flushOffset].isCommitted()) {
-                if (!retry) {
-                    LOG.debug("Offset {} not ready yet, giving up", flushOffset);
-                    return null;
-                }
-
-                LOG.debug("Offset {} not ready yet, retrying", flushOffset);
-                LockSupport.parkNanos(FLUSH_RETRY_NANOS);
-                retry = false;
+            final OutboundQueueEntry entry = queue[flushOffset];
+            if (!entry.isCommitted()) {
+                LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
+                return null;
             }
 
-            final OfHeader msg = queue[flushOffset++].getMessage();
+            final OfHeader msg = entry.takeMessage();
+            flushOffset++;
             if (msg != null) {
                 return msg;
             }
+
+            LOG.trace("Null message, skipping to offset {}", flushOffset);
         }
     }
 
-    private boolean xidInRance(final long xid) {
+    // Argument is 'long' to explicitly convert before performing operations
+    private boolean xidInRange(final long xid) {
         return xid < endXid && (xid >= baseXid || baseXid > endXid);
     }
 
+    private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
+        if (response instanceof Error) {
+            final Error err = (Error)response;
+            LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
+            entry.fail(new DeviceRequestFailedException("Device-side failure", err));
+            return true;
+        } else {
+            return entry.complete(response);
+        }
+    }
+
     /**
      * Return the request entry corresponding to a response. Returns null
      * if there is no request matching the response.
@@ -188,7 +270,7 @@ final class OutboundQueueImpl implements OutboundQueue {
      */
     OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
         final Long xid = response.getXid();
-        if (!xidInRance(xid)) {
+        if (!xidInRange(xid)) {
             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
             return null;
         }
@@ -200,34 +282,47 @@ final class OutboundQueueImpl implements OutboundQueue {
             return null;
         }
 
-        if (entry.complete(response)) {
-            completeCount++;
+        if (entry.isBarrier()) {
+            // This has been a barrier -- make sure we complete all preceding requests.
+            // XXX: Barriers are expected to complete in one message.
+            //      If this assumption is changed, this logic will need to be expanded
+            //      to ensure that the requests implied by the barrier are reported as
+            //      completed *after* the barrier.
+            LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
+            completeRequests(offset);
+            lastBarrierOffset = offset;
 
-            // This has been a barrier -- make sure we complete all preceding requests
-            if (entry.isBarrier()) {
-                LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1);
-                for (int i = 0; i < offset; ++i) {
-                    final OutboundQueueEntry e = queue[i];
-                    if (!e.isCompleted() && e.complete(null)) {
-                        completeCount++;
-                    }
-                }
-            }
+            final boolean success = completeEntry(entry, response);
+            Verify.verify(success, "Barrier request failed to complete");
+            completeCount++;
+        } else if (completeEntry(entry, response)) {
+            completeCount++;
         }
+
         return entry;
     }
 
-    void completeAll() {
-        for (OutboundQueueEntry entry : queue) {
+    private void completeRequests(final int toOffset) {
+        for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
+            final OutboundQueueEntry entry = queue[i];
             if (!entry.isCompleted() && entry.complete(null)) {
                 completeCount++;
             }
         }
     }
 
-    int failAll(final Throwable cause) {
+    void completeAll() {
+        completeRequests(queue.length);
+    }
+
+    int failAll(final OutboundQueueException cause) {
         int ret = 0;
-        for (OutboundQueueEntry entry : queue) {
+        for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
+            final OutboundQueueEntry entry = queue[i];
+            if (!entry.isCommitted()) {
+                break;
+            }
+
             if (!entry.isCompleted()) {
                 entry.fail(cause);
                 ret++;