Fix precondition format string
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
index 6713d5ceab000eb5df956833381dfd4af640674e..c9c28c16769cdbf4a498bf92f37de0e1ea447971 100644 (file)
@@ -87,9 +87,12 @@ final class OutboundQueueImpl implements OutboundQueue {
             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
         }
 
+        final int ro = reserveOffset;
+        Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
+
         final OutboundQueueEntry entry = queue[offset];
         entry.commit(message, callback);
-        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
 
         if (entry.isBarrier()) {
             int my = offset;
@@ -135,12 +138,30 @@ final class OutboundQueueImpl implements OutboundQueue {
         }
     }
 
+    int startShutdown() {
+        // Increment the offset by the queue size, hence preventing any normal
+        // allocations. We should not be seeing a barrier reservation after this
+        // and if there is one issued, we can disregard it.
+        final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
+
+        // If this offset is larger than reserve, trim it. That is not an accurate
+        // view of which slot was actually "reserved", but it indicates at which
+        // entry we can declare the queue flushed (e.g. at the emergency slot).
+        return offset > reserve ? reserve : offset;
+    }
+
+    boolean isShutdown(final int offset) {
+        // This queue is shutdown if the flushOffset (e.g. the next entry to
+        // be flushed) points to the offset 'reserved' in startShutdown()
+        return flushOffset >= offset;
+    }
+
     /**
      * An empty queue is a queue which has no further unflushed entries.
      *
      * @return True if this queue does not have unprocessed entries.
      */
-    boolean isEmpty() {
+    private boolean isEmpty() {
         int ro = reserveOffset;
         if (ro >= reserve) {
             if (queue[reserve].isCommitted()) {
@@ -170,7 +191,7 @@ final class OutboundQueueImpl implements OutboundQueue {
     }
 
     boolean isFlushed() {
-        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
+        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
         if (flushOffset < reserve) {
             return false;
         }
@@ -179,11 +200,28 @@ final class OutboundQueueImpl implements OutboundQueue {
         return flushOffset >= queue.length || !queue[reserve].isCommitted();
     }
 
+    boolean needsFlush() {
+        if (flushOffset < reserve) {
+            return queue[flushOffset].isCommitted();
+        }
+
+        if (isFlushed()) {
+            LOG.trace("Queue {} is flushed, schedule a replace", this);
+            return true;
+        }
+        if (isFinished()) {
+            LOG.trace("Queue {} is finished, schedule a cleanup", this);
+            return true;
+        }
+
+        return false;
+    }
+
     OfHeader flushEntry() {
         for (;;) {
             // No message ready
             if (isEmpty()) {
-                LOG.trace("Flushed all reserved entries up to ", flushOffset);
+                LOG.trace("Flushed all reserved entries up to {}", flushOffset);
                 return null;
             }