From: Robert Varga Date: Fri, 15 May 2015 23:38:12 +0000 (+0200) Subject: Enable periodic barrier only when needed X-Git-Tag: release/lithium~28 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=096d3de0bc39bef5bb0493e721d794e850a5b49d;p=openflowjava.git Enable periodic barrier only when needed Instead of rescheduling the timer, track precisely when we need to have it enabled. This will ensure that idle channels are really idle. Change-Id: I34f7b2fdc5a7abb9d3c2c612adb307bdf097cf10 Signed-off-by: Robert Varga --- diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java index 1bb9ec4b..2d94b679 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java @@ -21,6 +21,8 @@ final class OutboundQueueImpl implements OutboundQueue { private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class); private static final AtomicIntegerFieldUpdater CURRENT_OFFSET_UPDATER = AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset"); + private static final AtomicIntegerFieldUpdater BARRIER_OFFSET_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset"); private static final long FLUSH_RETRY_NANOS = 1L; private final OutboundQueueManager manager; private final OutboundQueueEntry[] queue; @@ -29,7 +31,8 @@ final class OutboundQueueImpl implements OutboundQueue { private final int reserve; // Updated concurrently - private volatile int reserveOffset; + private volatile int barrierOffset = -1; + private volatile int reserveOffset = 0; // Updated from Netty only private int flushOffset; @@ -69,7 +72,40 @@ final class OutboundQueueImpl implements OutboundQueue { return new OutboundQueueImpl(manager, baseXid, queue); } - Long reserveEntry(final boolean forBarrier) { + @Override + public Long reserveEntry() { + return reserveEntry(false); + } + + @Override + public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback) { + final int offset = (int)(xid - baseXid); + if (message != null) { + Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid); + } + + final OutboundQueueEntry entry = queue[offset]; + entry.commit(message, callback); + LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset); + + if (entry.isBarrier()) { + int my = offset; + for (;;) { + final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my); + if (prev < my) { + LOG.debug("Queue {} recorded pending barrier offset {}", this, my); + break; + } + + // We have traveled back, recover + my = prev; + } + } + + manager.ensureFlushing(this); + } + + private Long reserveEntry(final boolean forBarrier) { final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this); if (offset >= reserve) { if (forBarrier) { @@ -86,22 +122,14 @@ final class OutboundQueueImpl implements OutboundQueue { return xid; } - @Override - public Long reserveEntry() { - return reserveEntry(false); - } - - @Override - public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback) { - final int offset = (int)(xid - baseXid); - if (message != null) { - Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid); + Long reserveBarrierIfNeeded() { + final int bo = barrierOffset; + if (bo >= flushOffset) { + LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset); + return null; + } else { + return reserveEntry(true); } - - queue[offset].commit(message, callback); - LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset); - - manager.ensureFlushing(this); } /** diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java index b7c2c8f9..43cbf027 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -8,7 +8,6 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.net.InetSocketAddress; @@ -18,7 +17,7 @@ import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicBoolean; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; @@ -49,6 +48,7 @@ final class OutboundQueueManager extends Channel private final Queue queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE); private final Queue activeQueues = new LinkedList<>(); + private final AtomicBoolean flushScheduled = new AtomicBoolean(); private final ConnectionAdapterImpl parent; private final InetSocketAddress address; private final long maxBarrierNanos; @@ -56,18 +56,10 @@ final class OutboundQueueManager extends Channel private final int queueSize; private final T handler; - /* - * Instead of using an AtomicBoolean object, we use these two. It saves us - * from allocating an extra object. - */ - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater FLUSH_SCHEDULED_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled"); - private volatile int flushScheduled = 0; - // Updated from netty only private long lastBarrierNanos = System.nanoTime(); private OutboundQueueImpl currentQueue; + private boolean barrierTimerEnabled; private int nonBarrierMessages; private long lastXid = 0; @@ -98,7 +90,6 @@ final class OutboundQueueManager extends Channel LOG.debug("Queue manager instantiated with queue size {}", queueSize); createQueue(); - scheduleBarrierTimer(lastBarrierNanos); } T getHandler() { @@ -147,21 +138,18 @@ final class OutboundQueueManager extends Channel final long delay = next - now; LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay)); parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS); + barrierTimerEnabled = true; } private void scheduleBarrierMessage() { - final Long xid = currentQueue.reserveEntry(true); - Verify.verifyNotNull(xid); + final Long xid = currentQueue.reserveBarrierIfNeeded(); + if (xid == null) { + LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue); + return; + } currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null); LOG.debug("Barrier XID {} scheduled", xid); - - // We can see into the future when compared to flushEntry(), as that - // codepath may be lagging behind on messages. Resetting the counter - // here ensures that flushEntry() will not attempt to issue a flush - // request. Note that we do not reset current time, as that should - // reflect when we sent the message for real. - nonBarrierMessages = 0; } /** @@ -196,6 +184,8 @@ final class OutboundQueueManager extends Channel if (nonBarrierMessages >= queueSize) { LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages); scheduleBarrierMessage(); + } else if (!barrierTimerEnabled) { + scheduleBarrierTimer(now); } } @@ -260,7 +250,7 @@ final class OutboundQueueManager extends Channel private void scheduleFlush() { if (parent.getChannel().isWritable()) { - if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) { + if (flushScheduled.compareAndSet(false, true)) { LOG.trace("Scheduling flush task on channel {}", parent.getChannel()); parent.getChannel().eventLoop().execute(flushRunnable); } else { @@ -281,6 +271,7 @@ final class OutboundQueueManager extends Channel */ protected void barrier() { LOG.debug("Channel {} barrier timer expired", parent.getChannel()); + barrierTimerEnabled = false; if (currentQueue == null) { LOG.debug("Channel shut down, not processing barrier"); return; @@ -297,8 +288,6 @@ final class OutboundQueueManager extends Channel scheduleBarrierMessage(); } } - - scheduleBarrierTimer(now); } /** @@ -363,7 +352,7 @@ final class OutboundQueueManager extends Channel * flush out is needed. That will re-synchronized with other threads * such that only one flush is scheduled at any given time. */ - if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) { + if (!flushScheduled.compareAndSet(true, false)) { LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this); } @@ -419,6 +408,6 @@ final class OutboundQueueManager extends Channel @Override public String toString() { - return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled); + return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get()); } }