Enable periodic barrier only when needed
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
index 1bb9ec4bc1cfaf4c000709bc47d9c43b76a48b7a..2d94b679adfd9fa9bfcfb165bf42d189be817bc0 100644 (file)
@@ -21,6 +21,8 @@ final class OutboundQueueImpl implements OutboundQueue {
     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
+    private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
     private static final long FLUSH_RETRY_NANOS = 1L;
     private final OutboundQueueManager<?> manager;
     private final OutboundQueueEntry[] queue;
@@ -29,7 +31,8 @@ final class OutboundQueueImpl implements OutboundQueue {
     private final int reserve;
 
     // Updated concurrently
-    private volatile int reserveOffset;
+    private volatile int barrierOffset = -1;
+    private volatile int reserveOffset = 0;
 
     // Updated from Netty only
     private int flushOffset;
@@ -69,7 +72,40 @@ final class OutboundQueueImpl implements OutboundQueue {
         return new OutboundQueueImpl(manager, baseXid, queue);
     }
 
-    Long reserveEntry(final boolean forBarrier) {
+    @Override
+    public Long reserveEntry() {
+        return reserveEntry(false);
+    }
+
+    @Override
+    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+        final int offset = (int)(xid - baseXid);
+        if (message != null) {
+            Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+        }
+
+        final OutboundQueueEntry entry = queue[offset];
+        entry.commit(message, callback);
+        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+
+        if (entry.isBarrier()) {
+            int my = offset;
+            for (;;) {
+                final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
+                if (prev < my) {
+                    LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
+                    break;
+                }
+
+                // We have traveled back, recover
+                my = prev;
+            }
+        }
+
+        manager.ensureFlushing(this);
+    }
+
+    private Long reserveEntry(final boolean forBarrier) {
         final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
         if (offset >= reserve) {
             if (forBarrier) {
@@ -86,22 +122,14 @@ final class OutboundQueueImpl implements OutboundQueue {
         return xid;
     }
 
-    @Override
-    public Long reserveEntry() {
-        return reserveEntry(false);
-    }
-
-    @Override
-    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
-        final int offset = (int)(xid - baseXid);
-        if (message != null) {
-            Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+    Long reserveBarrierIfNeeded() {
+        final int bo = barrierOffset;
+        if (bo >= flushOffset) {
+            LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
+            return null;
+        } else {
+            return reserveEntry(true);
         }
-
-        queue[offset].commit(message, callback);
-        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
-
-        manager.ensureFlushing(this);
     }
 
     /**