BUG-3219: Throw away message as soon as we send it out
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
index e8e2307ca2be3108053d94cba24013377b7e1989..2433637f1bbc72148c89dafec2d38965f76d6759 100644 (file)
@@ -12,8 +12,9 @@ import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import javax.annotation.Nonnull;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowjava.protocol.api.connection.DeviceRequestFailedException;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
@@ -65,6 +66,9 @@ final class OutboundQueueImpl implements OutboundQueue {
         this.baseXid = baseXid;
         this.endXid = baseXid + queue.length;
         this.reserve = queue.length - 1;
+    }
+
+    void retire() {
         for (OutboundQueueEntry element : queue) {
             element.reset();
         }
@@ -86,9 +90,12 @@ final class OutboundQueueImpl implements OutboundQueue {
             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
         }
 
+        final int ro = reserveOffset;
+        Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
+
         final OutboundQueueEntry entry = queue[offset];
         entry.commit(message, callback);
-        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
 
         if (entry.isBarrier()) {
             int my = offset;
@@ -134,12 +141,30 @@ final class OutboundQueueImpl implements OutboundQueue {
         }
     }
 
+    int startShutdown() {
+        // Increment the offset by the queue size, hence preventing any normal
+        // allocations. We should not be seeing a barrier reservation after this
+        // and if there is one issued, we can disregard it.
+        final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
+
+        // If this offset is larger than reserve, trim it. That is not an accurate
+        // view of which slot was actually "reserved", but it indicates at which
+        // entry we can declare the queue flushed (e.g. at the emergency slot).
+        return offset > reserve ? reserve : offset;
+    }
+
+    boolean isShutdown(final int offset) {
+        // This queue is shutdown if the flushOffset (e.g. the next entry to
+        // be flushed) points to the offset 'reserved' in startShutdown()
+        return flushOffset >= offset;
+    }
+
     /**
      * An empty queue is a queue which has no further unflushed entries.
      *
      * @return True if this queue does not have unprocessed entries.
      */
-    boolean isEmpty() {
+    private boolean isEmpty() {
         int ro = reserveOffset;
         if (ro >= reserve) {
             if (queue[reserve].isCommitted()) {
@@ -169,7 +194,7 @@ final class OutboundQueueImpl implements OutboundQueue {
     }
 
     boolean isFlushed() {
-        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
+        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
         if (flushOffset < reserve) {
             return false;
         }
@@ -178,11 +203,28 @@ final class OutboundQueueImpl implements OutboundQueue {
         return flushOffset >= queue.length || !queue[reserve].isCommitted();
     }
 
+    boolean needsFlush() {
+        if (flushOffset < reserve) {
+            return queue[flushOffset].isCommitted();
+        }
+
+        if (isFlushed()) {
+            LOG.trace("Queue {} is flushed, schedule a replace", this);
+            return true;
+        }
+        if (isFinished()) {
+            LOG.trace("Queue {} is finished, schedule a cleanup", this);
+            return true;
+        }
+
+        return false;
+    }
+
     OfHeader flushEntry() {
         for (;;) {
             // No message ready
             if (isEmpty()) {
-                LOG.trace("Flushed all reserved entries up to ", flushOffset);
+                LOG.trace("Flushed all reserved entries up to {}", flushOffset);
                 return null;
             }
 
@@ -192,7 +234,7 @@ final class OutboundQueueImpl implements OutboundQueue {
                 return null;
             }
 
-            final OfHeader msg = entry.getMessage();
+            final OfHeader msg = entry.takeMessage();
             flushOffset++;
             if (msg != null) {
                 return msg;
@@ -272,10 +314,14 @@ final class OutboundQueueImpl implements OutboundQueue {
         completeRequests(queue.length);
     }
 
-    int failAll(final Throwable cause) {
+    int failAll(final OutboundQueueException cause) {
         int ret = 0;
         for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
             final OutboundQueueEntry entry = queue[i];
+            if (!entry.isCommitted()) {
+                break;
+            }
+
             if (!entry.isCompleted()) {
                 entry.fail(cause);
                 ret++;