From d61ef13aa0016d99bb07e27e8e2fe094e805809e Mon Sep 17 00:00:00 2001 From: Vaclav Demcak Date: Thu, 8 Oct 2015 03:30:45 +0200 Subject: [PATCH] Barrier turn on/off - no Barrier pipeline * impl noBarrier OutboundQueueManager and StackedOutboundQueue Change-Id: I02abcd3618337a9a9373eb59f98565ce61d90ad6 Signed-off-by: Vaclav Demcak (cherry picked from commit 64aebad95b27a6f903a7f65e419cd41df1816c90) --- .../AbstractOutboundQueueManager.java | 2 +- .../AbstractStackedOutboundQueue.java | 76 +++++++++++- .../connection/ConnectionAdapterImpl.java | 9 +- .../core/connection/OutboundQueueEntry.java | 12 +- .../OutboundQueueManagerNoBarrier.java | 30 +++++ .../core/connection/StackedOutboundQueue.java | 80 +----------- .../StackedOutboundQueueNoBarrier.java | 114 ++++++++++++++++++ .../impl/core/connection/StackedSegment.java | 16 ++- 8 files changed, 248 insertions(+), 91 deletions(-) create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManagerNoBarrier.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java index 99bec867..520d145d 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java @@ -258,7 +258,7 @@ abstract class AbstractOutboundQueueManager it = uncompletedSegments.iterator(); + while (it.hasNext()) { + final StackedSegment queue = it.next(); + final OutboundQueueEntry entry = queue.pairRequest(message); + if (entry == null) { + continue; + } + + LOG.trace("Queue {} accepted response {}", queue, message); + + // This has been a barrier request, we need to flush all + // previous queues + if (entry.isBarrier() && uncompletedSegments.size() > 1) { + LOG.trace("Queue {} indicated request was a barrier", queue); + + it = uncompletedSegments.iterator(); + while (it.hasNext()) { + final StackedSegment q = it.next(); + + // We want to complete all queues before the current one, we will + // complete the current queue below + if (!queue.equals(q)) { + LOG.trace("Queue {} is implied finished", q); + q.completeAll(); + it.remove(); + q.recycle(); + } else { + break; + } + } + } + + if (queue.isComplete()) { + LOG.trace("Queue {} is finished", queue); + it.remove(); + queue.recycle(); + } + + return true; + } + + LOG.debug("Failed to find completion for message {}", message); + return false; + } boolean needsFlush() { // flushOffset always points to the first entry, which can be changed only @@ -228,6 +272,36 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { return !needsFlush(); } + protected OutboundQueueEntry getEntry(final Long xid) { + final StackedSegment fastSegment = firstSegment; + final long calcOffset = xid - fastSegment.getBaseXid(); + Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid()); + + Verify.verify(calcOffset <= Integer.MAX_VALUE); + final int fastOffset = (int) calcOffset; + + if (fastOffset >= StackedSegment.SEGMENT_SIZE) { + LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset); + + final StackedSegment segment; + final int slowOffset; + synchronized (unflushedSegments) { + final StackedSegment slowSegment = firstSegment; + final long slowCalcOffset = xid - slowSegment.getBaseXid(); + Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE); + slowOffset = (int) slowCalcOffset; + + LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset); + segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE); + } + + final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE; + LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset); + return segment.getEntry(segOffset); + } + return fastSegment.getEntry(fastOffset); + } + @GuardedBy("unflushedSegments") private long lockedShutdownFlush() { long entries = 0; diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java index d22f2f06..37635c03 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java @@ -47,7 +47,7 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i private ConnectionReadyListener connectionReadyListener; private OpenflowProtocolListener messageListener; private SystemNotificationsListener systemListener; - private OutboundQueueManager outputManager; + private AbstractOutboundQueueManager outputManager; private OFVersionDetector versionDetector; private final boolean useBarrier; @@ -192,11 +192,14 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i final T handler, final int maxQueueDepth, final long maxBarrierNanos) { Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager); + final AbstractOutboundQueueManager ret; if (useBarrier) { - + ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos); + } else { + LOG.warn("OutboundQueueManager without barrier is started."); + ret = new OutboundQueueManagerNoBarrier<>(this, address, handler); } - final OutboundQueueManager ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos); outputManager = ret; /* we don't need it anymore */ channel.pipeline().remove(output); diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java index a97a50e9..70900cad 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java @@ -58,15 +58,21 @@ final class OutboundQueueEntry { OfHeader takeMessage() { final OfHeader ret = message; - checkCompletionNeed(); + if (!barrier) { + checkCompletionNeed(); + } message = null; return ret; } private void checkCompletionNeed() { - if (callback == null || PacketOutInput.class.isInstance(message)) { + if (callback == null || (message instanceof PacketOutInput)) { completed = true; - callback = null; + if (callback != null) { + callback.onSuccess(null); + callback = null; + } + committed = false; } } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManagerNoBarrier.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManagerNoBarrier.java new file mode 100644 index 00000000..1fc51473 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManagerNoBarrier.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import java.net.InetSocketAddress; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; + +/** + * + * @param + */ +public class OutboundQueueManagerNoBarrier extends + AbstractOutboundQueueManager { + + OutboundQueueManagerNoBarrier(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) { + super(parent, address, handler); + } + + @Override + protected StackedOutboundQueueNoBarrier initializeStackedOutboudnqueue() { + return new StackedOutboundQueueNoBarrier(this); + } + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java index f19d9aa1..a9876d99 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java @@ -7,10 +7,7 @@ */ package org.opendaylight.openflowjava.protocol.impl.core.connection; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import com.google.common.util.concurrent.FutureCallback; -import java.util.Iterator; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; @@ -31,35 +28,7 @@ final class StackedOutboundQueue extends AbstractStackedOutboundQueue { */ @Override public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback) { - final StackedSegment fastSegment = firstSegment; - final long calcOffset = xid - fastSegment.getBaseXid(); - Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid()); - - Verify.verify(calcOffset <= Integer.MAX_VALUE); - final int fastOffset = (int) calcOffset; - - final OutboundQueueEntry entry; - if (fastOffset >= StackedSegment.SEGMENT_SIZE) { - LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset); - - final StackedSegment segment; - final int slowOffset; - synchronized (unflushedSegments) { - final StackedSegment slowSegment = firstSegment; - final long slowCalcOffset = xid - slowSegment.getBaseXid(); - Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE); - slowOffset = (int) slowCalcOffset; - - LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset); - segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE); - } - - final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE; - entry = segment.getEntry(segOffset); - LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset); - } else { - entry = fastSegment.getEntry(fastOffset); - } + final OutboundQueueEntry entry = getEntry(xid); entry.commit(message, callback); if (entry.isBarrier()) { @@ -90,51 +59,4 @@ final class StackedOutboundQueue extends AbstractStackedOutboundQueue { } return reserveEntry(); } - - @Override - boolean pairRequest(final OfHeader message) { - Iterator it = uncompletedSegments.iterator(); - while (it.hasNext()) { - final StackedSegment queue = it.next(); - final OutboundQueueEntry entry = queue.pairRequest(message); - if (entry == null) { - continue; - } - - LOG.trace("Queue {} accepted response {}", queue, message); - - // This has been a barrier request, we need to flush all - // previous queues - if (entry.isBarrier() && uncompletedSegments.size() > 1) { - LOG.trace("Queue {} indicated request was a barrier", queue); - - it = uncompletedSegments.iterator(); - while (it.hasNext()) { - final StackedSegment q = it.next(); - - // We want to complete all queues before the current one, we will - // complete the current queue below - if (!queue.equals(q)) { - LOG.trace("Queue {} is implied finished", q); - q.completeAll(); - it.remove(); - q.recycle(); - } else { - break; - } - } - } - - if (queue.isComplete()) { - LOG.trace("Queue {} is finished", queue); - it.remove(); - queue.recycle(); - } - - return true; - } - - LOG.debug("Failed to find completion for message {}", message); - return false; - } } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java new file mode 100644 index 00000000..2917631a --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import com.google.common.util.concurrent.FutureCallback; +import io.netty.channel.Channel; +import javax.annotation.Nonnull; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class is designed for stacking Statistics and propagate immediate response for all + * another requests. + */ +public class StackedOutboundQueueNoBarrier extends AbstractStackedOutboundQueue { + + private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueueNoBarrier.class); + + StackedOutboundQueueNoBarrier(final AbstractOutboundQueueManager manager) { + super(manager); + } + + /* + * This method is expected to be called from multiple threads concurrently + */ + @Override + public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback) { + final OutboundQueueEntry entry = getEntry(xid); + + if (message instanceof FlowModInput) { + callback.onSuccess(null); + entry.commit(message, null); + } else { + entry.commit(message, callback); + } + + LOG.trace("Queue {} committed XID {}", this, xid); + manager.ensureFlushing(); + } + + @Override + int writeEntries(@Nonnull final Channel channel, final long now) { + // Local cache + StackedSegment segment = firstSegment; + int entries = 0; + + while (channel.isWritable()) { + final OutboundQueueEntry entry = segment.getEntry(flushOffset); + if (!entry.isCommitted()) { + LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + + flushOffset, segment, flushOffset); + break; + } + + LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset); + final OfHeader message = entry.takeMessage(); + flushOffset++; + entries++; + + if (message != null) { + manager.writeMessage(message, now); + } else { + entry.complete(null); + } + + if (flushOffset >= StackedSegment.SEGMENT_SIZE) { + /* + * Slow path: purge the current segment unless it's the last one. + * If it is, we leave it for replacement when a new reservation + * is run on it. + * This costs us two slow paths, but hey, this should be very rare, + * so let's keep things simple. + */ + synchronized (unflushedSegments) { + LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size()); + + // We may have raced ahead of reservation code and need to allocate a segment + ensureSegment(segment, flushOffset); + + // Remove the segment, update the firstSegment and reset flushOffset + final StackedSegment oldSegment = unflushedSegments.remove(0); + oldSegment.completeAll(); + uncompletedSegments.remove(oldSegment); + oldSegment.recycle(); + + // Reset the first segment and add it to the uncompleted list + segment = unflushedSegments.get(0); + uncompletedSegments.add(segment); + + // Update the shutdown offset + if (shutdownOffset != null) { + shutdownOffset -= StackedSegment.SEGMENT_SIZE; + } + + // Allow reservations back on the fast path by publishing the new first segment + firstSegment = segment; + + flushOffset = 0; + LOG.debug("Queue {} flush moved to segment {}", this, segment); + } + } + } + + return entries; + } +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedSegment.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedSegment.java index b9030b81..c971c663 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedSegment.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedSegment.java @@ -109,9 +109,17 @@ final class StackedSegment { LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString()); entry.fail(new DeviceRequestFailedException("Device-side failure", err)); return true; - } else { - return entry.complete(response); } + return entry.complete(response); + } + + OutboundQueueEntry findEntry(final long xid) { + if (! xidInRange(xid)) { + LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid); + return null; + } + final int offset = (int)(xid - baseXid); + return entries[offset]; } OutboundQueueEntry pairRequest(final OfHeader response) { @@ -122,7 +130,7 @@ final class StackedSegment { return null; } - final int offset = (int)(xid - baseXid); + final int offset = (int) (xid - baseXid); final OutboundQueueEntry entry = entries[offset]; if (entry.isCompleted()) { LOG.debug("Entry {} already is completed, not accepting response {}", entry, response); @@ -184,7 +192,7 @@ final class StackedSegment { } void recycle() { - for (OutboundQueueEntry e : entries) { + for (final OutboundQueueEntry e : entries) { e.reset(); } -- 2.36.6