Run flush immediately when channel becomes writable
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
index 921b4290a43e952d9e7faa90ef5c52e75846e521..c4feb26af88c3680770cf1e311fb7c79e4f67776 100644 (file)
@@ -66,12 +66,15 @@ final class OutboundQueueImpl implements OutboundQueue {
         this.baseXid = baseXid;
         this.endXid = baseXid + queue.length;
         this.reserve = queue.length - 1;
+    }
+
+    void retire() {
         for (OutboundQueueEntry element : queue) {
             element.reset();
         }
     }
 
-    OutboundQueueImpl reuse(final long baseXid) {
+    OutboundQueueImpl reuse(final OutboundQueueManager<?> manager, final long baseXid) {
         return new OutboundQueueImpl(manager, baseXid, queue);
     }
 
@@ -87,9 +90,12 @@ final class OutboundQueueImpl implements OutboundQueue {
             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
         }
 
+        final int ro = reserveOffset;
+        Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
+
         final OutboundQueueEntry entry = queue[offset];
         entry.commit(message, callback);
-        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
 
         if (entry.isBarrier()) {
             int my = offset;
@@ -101,6 +107,7 @@ final class OutboundQueueImpl implements OutboundQueue {
                 }
 
                 // We have traveled back, recover
+                LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
                 my = prev;
             }
         }
@@ -218,7 +225,7 @@ final class OutboundQueueImpl implements OutboundQueue {
         for (;;) {
             // No message ready
             if (isEmpty()) {
-                LOG.trace("Flushed all reserved entries up to ", flushOffset);
+                LOG.trace("Flushed all reserved entries up to {}", flushOffset);
                 return null;
             }
 
@@ -228,7 +235,7 @@ final class OutboundQueueImpl implements OutboundQueue {
                 return null;
             }
 
-            final OfHeader msg = entry.getMessage();
+            final OfHeader msg = entry.takeMessage();
             flushOffset++;
             if (msg != null) {
                 return msg;