Enable periodic barrier only when needed 79/20579/3
authorRobert Varga <rovarga@cisco.com>
Fri, 15 May 2015 23:38:12 +0000 (01:38 +0200)
committerRobert Varga <rovarga@cisco.com>
Sat, 16 May 2015 12:25:25 +0000 (14:25 +0200)
Instead of rescheduling the timer, track precisely when we need to have
it enabled. This will ensure that idle channels are really idle.

Change-Id: I34f7b2fdc5a7abb9d3c2c612adb307bdf097cf10
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java

index 1bb9ec4bc1cfaf4c000709bc47d9c43b76a48b7a..2d94b679adfd9fa9bfcfb165bf42d189be817bc0 100644 (file)
@@ -21,6 +21,8 @@ final class OutboundQueueImpl implements OutboundQueue {
     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
+    private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
     private static final long FLUSH_RETRY_NANOS = 1L;
     private final OutboundQueueManager<?> manager;
     private final OutboundQueueEntry[] queue;
@@ -29,7 +31,8 @@ final class OutboundQueueImpl implements OutboundQueue {
     private final int reserve;
 
     // Updated concurrently
-    private volatile int reserveOffset;
+    private volatile int barrierOffset = -1;
+    private volatile int reserveOffset = 0;
 
     // Updated from Netty only
     private int flushOffset;
@@ -69,7 +72,40 @@ final class OutboundQueueImpl implements OutboundQueue {
         return new OutboundQueueImpl(manager, baseXid, queue);
     }
 
-    Long reserveEntry(final boolean forBarrier) {
+    @Override
+    public Long reserveEntry() {
+        return reserveEntry(false);
+    }
+
+    @Override
+    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+        final int offset = (int)(xid - baseXid);
+        if (message != null) {
+            Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+        }
+
+        final OutboundQueueEntry entry = queue[offset];
+        entry.commit(message, callback);
+        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+
+        if (entry.isBarrier()) {
+            int my = offset;
+            for (;;) {
+                final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
+                if (prev < my) {
+                    LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
+                    break;
+                }
+
+                // We have traveled back, recover
+                my = prev;
+            }
+        }
+
+        manager.ensureFlushing(this);
+    }
+
+    private Long reserveEntry(final boolean forBarrier) {
         final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
         if (offset >= reserve) {
             if (forBarrier) {
@@ -86,22 +122,14 @@ final class OutboundQueueImpl implements OutboundQueue {
         return xid;
     }
 
-    @Override
-    public Long reserveEntry() {
-        return reserveEntry(false);
-    }
-
-    @Override
-    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
-        final int offset = (int)(xid - baseXid);
-        if (message != null) {
-            Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+    Long reserveBarrierIfNeeded() {
+        final int bo = barrierOffset;
+        if (bo >= flushOffset) {
+            LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
+            return null;
+        } else {
+            return reserveEntry(true);
         }
-
-        queue[offset].commit(message, callback);
-        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
-
-        manager.ensureFlushing(this);
     }
 
     /**
index b7c2c8f9efd57511e03964082dd89855a7dd9775..43cbf02792bfc45ae62b4fd917b688313ee3454f 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.net.InetSocketAddress;
@@ -18,7 +17,7 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
@@ -49,6 +48,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
+    private final AtomicBoolean flushScheduled = new AtomicBoolean();
     private final ConnectionAdapterImpl parent;
     private final InetSocketAddress address;
     private final long maxBarrierNanos;
@@ -56,18 +56,10 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private final int queueSize;
     private final T handler;
 
-    /*
-     * Instead of using an AtomicBoolean object, we use these two. It saves us
-     * from allocating an extra object.
-     */
-    @SuppressWarnings("rawtypes")
-    private static final AtomicIntegerFieldUpdater<OutboundQueueManager> FLUSH_SCHEDULED_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled");
-    private volatile int flushScheduled = 0;
-
     // Updated from netty only
     private long lastBarrierNanos = System.nanoTime();
     private OutboundQueueImpl currentQueue;
+    private boolean barrierTimerEnabled;
     private int nonBarrierMessages;
     private long lastXid = 0;
 
@@ -98,7 +90,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
         LOG.debug("Queue manager instantiated with queue size {}", queueSize);
         createQueue();
-        scheduleBarrierTimer(lastBarrierNanos);
     }
 
     T getHandler() {
@@ -147,21 +138,18 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         final long delay = next - now;
         LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
+        barrierTimerEnabled = true;
     }
 
     private void scheduleBarrierMessage() {
-        final Long xid = currentQueue.reserveEntry(true);
-        Verify.verifyNotNull(xid);
+        final Long xid = currentQueue.reserveBarrierIfNeeded();
+        if (xid == null) {
+            LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue);
+            return;
+        }
 
         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
         LOG.debug("Barrier XID {} scheduled", xid);
-
-        // We can see into the future when compared to flushEntry(), as that
-        // codepath may be lagging behind on messages. Resetting the counter
-        // here ensures that flushEntry() will not attempt to issue a flush
-        // request. Note that we do not reset current time, as that should
-        // reflect when we sent the message for real.
-        nonBarrierMessages = 0;
     }
 
     /**
@@ -196,6 +184,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             if (nonBarrierMessages >= queueSize) {
                 LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
                 scheduleBarrierMessage();
+            } else if (!barrierTimerEnabled) {
+                scheduleBarrierTimer(now);
             }
         }
 
@@ -260,7 +250,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     private void scheduleFlush() {
         if (parent.getChannel().isWritable()) {
-            if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
+            if (flushScheduled.compareAndSet(false, true)) {
                 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
                 parent.getChannel().eventLoop().execute(flushRunnable);
             } else {
@@ -281,6 +271,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      */
     protected void barrier() {
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
+        barrierTimerEnabled = false;
         if (currentQueue == null) {
             LOG.debug("Channel shut down, not processing barrier");
             return;
@@ -297,8 +288,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                 scheduleBarrierMessage();
             }
         }
-
-        scheduleBarrierTimer(now);
     }
 
     /**
@@ -363,7 +352,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
          * flush out is needed. That will re-synchronized with other threads
          * such that only one flush is scheduled at any given time.
          */
-        if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
+        if (!flushScheduled.compareAndSet(true, false)) {
             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
         }
 
@@ -419,6 +408,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     @Override
     public String toString() {
-        return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled);
+        return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
     }
 }