Enable periodic barrier only when needed
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
index b7c2c8f9efd57511e03964082dd89855a7dd9775..43cbf02792bfc45ae62b4fd917b688313ee3454f 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.net.InetSocketAddress;
@@ -18,7 +17,7 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
@@ -49,6 +48,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
+    private final AtomicBoolean flushScheduled = new AtomicBoolean();
     private final ConnectionAdapterImpl parent;
     private final InetSocketAddress address;
     private final long maxBarrierNanos;
@@ -56,18 +56,10 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private final int queueSize;
     private final T handler;
 
-    /*
-     * Instead of using an AtomicBoolean object, we use these two. It saves us
-     * from allocating an extra object.
-     */
-    @SuppressWarnings("rawtypes")
-    private static final AtomicIntegerFieldUpdater<OutboundQueueManager> FLUSH_SCHEDULED_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled");
-    private volatile int flushScheduled = 0;
-
     // Updated from netty only
     private long lastBarrierNanos = System.nanoTime();
     private OutboundQueueImpl currentQueue;
+    private boolean barrierTimerEnabled;
     private int nonBarrierMessages;
     private long lastXid = 0;
 
@@ -98,7 +90,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
         LOG.debug("Queue manager instantiated with queue size {}", queueSize);
         createQueue();
-        scheduleBarrierTimer(lastBarrierNanos);
     }
 
     T getHandler() {
@@ -147,21 +138,18 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         final long delay = next - now;
         LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
+        barrierTimerEnabled = true;
     }
 
     private void scheduleBarrierMessage() {
-        final Long xid = currentQueue.reserveEntry(true);
-        Verify.verifyNotNull(xid);
+        final Long xid = currentQueue.reserveBarrierIfNeeded();
+        if (xid == null) {
+            LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue);
+            return;
+        }
 
         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
         LOG.debug("Barrier XID {} scheduled", xid);
-
-        // We can see into the future when compared to flushEntry(), as that
-        // codepath may be lagging behind on messages. Resetting the counter
-        // here ensures that flushEntry() will not attempt to issue a flush
-        // request. Note that we do not reset current time, as that should
-        // reflect when we sent the message for real.
-        nonBarrierMessages = 0;
     }
 
     /**
@@ -196,6 +184,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             if (nonBarrierMessages >= queueSize) {
                 LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
                 scheduleBarrierMessage();
+            } else if (!barrierTimerEnabled) {
+                scheduleBarrierTimer(now);
             }
         }
 
@@ -260,7 +250,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     private void scheduleFlush() {
         if (parent.getChannel().isWritable()) {
-            if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
+            if (flushScheduled.compareAndSet(false, true)) {
                 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
                 parent.getChannel().eventLoop().execute(flushRunnable);
             } else {
@@ -281,6 +271,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      */
     protected void barrier() {
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
+        barrierTimerEnabled = false;
         if (currentQueue == null) {
             LOG.debug("Channel shut down, not processing barrier");
             return;
@@ -297,8 +288,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                 scheduleBarrierMessage();
             }
         }
-
-        scheduleBarrierTimer(now);
     }
 
     /**
@@ -363,7 +352,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
          * flush out is needed. That will re-synchronized with other threads
          * such that only one flush is scheduled at any given time.
          */
-        if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
+        if (!flushScheduled.compareAndSet(true, false)) {
             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
         }
 
@@ -419,6 +408,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     @Override
     public String toString() {
-        return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled);
+        return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
     }
 }