BUG-3219: Throw away message as soon as we send it out
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
index 4f708004eb08420629ad09ee8dc24bcc0c19d74c..2433637f1bbc72148c89dafec2d38965f76d6759 100644 (file)
@@ -66,6 +66,9 @@ final class OutboundQueueImpl implements OutboundQueue {
         this.baseXid = baseXid;
         this.endXid = baseXid + queue.length;
         this.reserve = queue.length - 1;
+    }
+
+    void retire() {
         for (OutboundQueueEntry element : queue) {
             element.reset();
         }
@@ -87,9 +90,12 @@ final class OutboundQueueImpl implements OutboundQueue {
             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
         }
 
+        final int ro = reserveOffset;
+        Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
+
         final OutboundQueueEntry entry = queue[offset];
         entry.commit(message, callback);
-        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
 
         if (entry.isBarrier()) {
             int my = offset;
@@ -188,7 +194,7 @@ final class OutboundQueueImpl implements OutboundQueue {
     }
 
     boolean isFlushed() {
-        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
+        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
         if (flushOffset < reserve) {
             return false;
         }
@@ -218,7 +224,7 @@ final class OutboundQueueImpl implements OutboundQueue {
         for (;;) {
             // No message ready
             if (isEmpty()) {
-                LOG.trace("Flushed all reserved entries up to ", flushOffset);
+                LOG.trace("Flushed all reserved entries up to {}", flushOffset);
                 return null;
             }
 
@@ -228,7 +234,7 @@ final class OutboundQueueImpl implements OutboundQueue {
                 return null;
             }
 
-            final OfHeader msg = entry.getMessage();
+            final OfHeader msg = entry.takeMessage();
             flushOffset++;
             if (msg != null) {
                 return msg;