Barrier turn on/off - no Barrier pipeline 38/28038/13
authorVaclav Demcak <vdemcak@cisco.com>
Thu, 8 Oct 2015 01:30:45 +0000 (03:30 +0200)
committerVaclav Demcak <vdemcak@cisco.com>
Tue, 3 Nov 2015 10:07:08 +0000 (11:07 +0100)
* impl noBarrier OutboundQueueManager and StackedOutboundQueue

Change-Id: I02abcd3618337a9a9373eb59f98565ce61d90ad6
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManagerNoBarrier.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedSegment.java

index 99bec867e582ae60c2996c67a483ac014c53b679..520d145d87523872d422597c096e24487fdbf164 100644 (file)
@@ -258,7 +258,7 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O ex
      *
      * @return
      */
-    private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
+    protected Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
         Preconditions.checkArgument(msg != null);
 
         if (address == null) {
index 77cb688d462afa544e33c554d6822a69d1df99a3..54e779eaa26b2354323487ee9fa3c9e804a44683 100644 (file)
@@ -184,7 +184,51 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue {
         return entries;
     }
 
-    abstract boolean pairRequest(final OfHeader message);
+    boolean pairRequest(final OfHeader message) {
+        Iterator<StackedSegment> it = uncompletedSegments.iterator();
+        while (it.hasNext()) {
+            final StackedSegment queue = it.next();
+            final OutboundQueueEntry entry = queue.pairRequest(message);
+            if (entry == null) {
+                continue;
+            }
+
+            LOG.trace("Queue {} accepted response {}", queue, message);
+
+            // This has been a barrier request, we need to flush all
+            // previous queues
+            if (entry.isBarrier() && uncompletedSegments.size() > 1) {
+                LOG.trace("Queue {} indicated request was a barrier", queue);
+
+                it = uncompletedSegments.iterator();
+                while (it.hasNext()) {
+                    final StackedSegment q = it.next();
+
+                    // We want to complete all queues before the current one, we will
+                    // complete the current queue below
+                    if (!queue.equals(q)) {
+                        LOG.trace("Queue {} is implied finished", q);
+                        q.completeAll();
+                        it.remove();
+                        q.recycle();
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            if (queue.isComplete()) {
+                LOG.trace("Queue {} is finished", queue);
+                it.remove();
+                queue.recycle();
+            }
+
+            return true;
+        }
+
+        LOG.debug("Failed to find completion for message {}", message);
+        return false;
+    }
 
     boolean needsFlush() {
         // flushOffset always points to the first entry, which can be changed only
@@ -228,6 +272,36 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue {
         return !needsFlush();
     }
 
+    protected OutboundQueueEntry getEntry(final Long xid) {
+        final StackedSegment fastSegment = firstSegment;
+        final long calcOffset = xid - fastSegment.getBaseXid();
+        Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
+
+        Verify.verify(calcOffset <= Integer.MAX_VALUE);
+        final int fastOffset = (int) calcOffset;
+
+        if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
+            LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
+
+            final StackedSegment segment;
+            final int slowOffset;
+            synchronized (unflushedSegments) {
+                final StackedSegment slowSegment = firstSegment;
+                final long slowCalcOffset = xid - slowSegment.getBaseXid();
+                Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
+                slowOffset = (int) slowCalcOffset;
+
+                LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
+                segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
+            }
+
+            final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
+            LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
+            return segment.getEntry(segOffset);
+        }
+        return fastSegment.getEntry(fastOffset);
+    }
+
     @GuardedBy("unflushedSegments")
     private long lockedShutdownFlush() {
         long entries = 0;
index d22f2f0606033413a53e2930677b71478768739f..37635c033122dbb96a1d67500665ad5806f8b855 100644 (file)
@@ -47,7 +47,7 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
     private ConnectionReadyListener connectionReadyListener;
     private OpenflowProtocolListener messageListener;
     private SystemNotificationsListener systemListener;
-    private OutboundQueueManager<?> outputManager;
+    private AbstractOutboundQueueManager<?, ?> outputManager;
     private OFVersionDetector versionDetector;
 
     private final boolean useBarrier;
@@ -192,11 +192,14 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
 
+        final AbstractOutboundQueueManager<T, ?> ret;
         if (useBarrier) {
-
+            ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
+        } else {
+            LOG.warn("OutboundQueueManager without barrier is started.");
+            ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
         }
 
-        final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
         outputManager = ret;
         /* we don't need it anymore */
         channel.pipeline().remove(output);
index a97a50e9b8d1fb1e3843bfeff61b00f03c9432ff..70900cad5497b30910a664baec6aeaa7b8e2dd77 100644 (file)
@@ -58,15 +58,21 @@ final class OutboundQueueEntry {
 
     OfHeader takeMessage() {
         final OfHeader ret = message;
-        checkCompletionNeed();
+        if (!barrier) {
+            checkCompletionNeed();
+        }
         message = null;
         return ret;
     }
 
     private void checkCompletionNeed() {
-        if (callback == null || PacketOutInput.class.isInstance(message)) {
+        if (callback == null || (message instanceof PacketOutInput)) {
             completed = true;
-            callback = null;
+            if (callback != null) {
+                callback.onSuccess(null);
+                callback = null;
+            }
+            committed = false;
         }
     }
 
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManagerNoBarrier.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManagerNoBarrier.java
new file mode 100644 (file)
index 0000000..1fc5147
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import java.net.InetSocketAddress;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+
+/**
+ *
+ * @param <T>
+ */
+public class OutboundQueueManagerNoBarrier<T extends OutboundQueueHandler> extends
+        AbstractOutboundQueueManager<T, StackedOutboundQueueNoBarrier> {
+
+    OutboundQueueManagerNoBarrier(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) {
+        super(parent, address, handler);
+    }
+
+    @Override
+    protected StackedOutboundQueueNoBarrier initializeStackedOutboudnqueue() {
+        return new StackedOutboundQueueNoBarrier(this);
+    }
+
+}
index f19d9aa134ede3fb944bf0bf941a28717f06a142..a9876d99ec9571c634d17254e427cd886780ea95 100644 (file)
@@ -7,10 +7,7 @@
  */
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
-import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
@@ -31,35 +28,7 @@ final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
      */
     @Override
     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
-        final StackedSegment fastSegment = firstSegment;
-        final long calcOffset = xid - fastSegment.getBaseXid();
-        Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
-
-        Verify.verify(calcOffset <= Integer.MAX_VALUE);
-        final int fastOffset = (int) calcOffset;
-
-        final OutboundQueueEntry entry;
-        if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
-            LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
-
-            final StackedSegment segment;
-            final int slowOffset;
-            synchronized (unflushedSegments) {
-                final StackedSegment slowSegment = firstSegment;
-                final long slowCalcOffset = xid - slowSegment.getBaseXid();
-                Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
-                slowOffset = (int) slowCalcOffset;
-
-                LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
-                segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
-            }
-
-            final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
-            entry = segment.getEntry(segOffset);
-            LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
-        } else {
-            entry = fastSegment.getEntry(fastOffset);
-        }
+        final OutboundQueueEntry entry = getEntry(xid);
 
         entry.commit(message, callback);
         if (entry.isBarrier()) {
@@ -90,51 +59,4 @@ final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
         }
         return reserveEntry();
     }
-
-    @Override
-    boolean pairRequest(final OfHeader message) {
-        Iterator<StackedSegment> it = uncompletedSegments.iterator();
-        while (it.hasNext()) {
-            final StackedSegment queue = it.next();
-            final OutboundQueueEntry entry = queue.pairRequest(message);
-            if (entry == null) {
-                continue;
-            }
-
-            LOG.trace("Queue {} accepted response {}", queue, message);
-
-            // This has been a barrier request, we need to flush all
-            // previous queues
-            if (entry.isBarrier() && uncompletedSegments.size() > 1) {
-                LOG.trace("Queue {} indicated request was a barrier", queue);
-
-                it = uncompletedSegments.iterator();
-                while (it.hasNext()) {
-                    final StackedSegment q = it.next();
-
-                    // We want to complete all queues before the current one, we will
-                    // complete the current queue below
-                    if (!queue.equals(q)) {
-                        LOG.trace("Queue {} is implied finished", q);
-                        q.completeAll();
-                        it.remove();
-                        q.recycle();
-                    } else {
-                        break;
-                    }
-                }
-            }
-
-            if (queue.isComplete()) {
-                LOG.trace("Queue {} is finished", queue);
-                it.remove();
-                queue.recycle();
-            }
-
-            return true;
-        }
-
-        LOG.debug("Failed to find completion for message {}", message);
-        return false;
-    }
 }
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java
new file mode 100644 (file)
index 0000000..2917631
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.util.concurrent.FutureCallback;
+import io.netty.channel.Channel;
+import javax.annotation.Nonnull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class is designed for stacking Statistics and propagate immediate response for all
+ * another requests.
+ */
+public class StackedOutboundQueueNoBarrier extends AbstractStackedOutboundQueue {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueueNoBarrier.class);
+
+    StackedOutboundQueueNoBarrier(final AbstractOutboundQueueManager<?, ?> manager) {
+        super(manager);
+    }
+
+    /*
+     * This method is expected to be called from multiple threads concurrently
+     */
+    @Override
+    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+        final OutboundQueueEntry entry = getEntry(xid);
+
+        if (message instanceof FlowModInput) {
+            callback.onSuccess(null);
+            entry.commit(message, null);
+        } else {
+            entry.commit(message, callback);
+        }
+
+        LOG.trace("Queue {} committed XID {}", this, xid);
+        manager.ensureFlushing();
+    }
+
+    @Override
+    int writeEntries(@Nonnull final Channel channel, final long now) {
+        // Local cache
+        StackedSegment segment = firstSegment;
+        int entries = 0;
+
+        while (channel.isWritable()) {
+            final OutboundQueueEntry entry = segment.getEntry(flushOffset);
+            if (!entry.isCommitted()) {
+                LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid()
+                        + flushOffset, segment, flushOffset);
+                break;
+            }
+
+            LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset);
+            final OfHeader message = entry.takeMessage();
+            flushOffset++;
+            entries++;
+
+            if (message != null) {
+                manager.writeMessage(message, now);
+            } else {
+                entry.complete(null);
+            }
+
+            if (flushOffset >= StackedSegment.SEGMENT_SIZE) {
+                /*
+                 * Slow path: purge the current segment unless it's the last one.
+                 * If it is, we leave it for replacement when a new reservation
+                 * is run on it.
+                 * This costs us two slow paths, but hey, this should be very rare,
+                 * so let's keep things simple.
+                 */
+                synchronized (unflushedSegments) {
+                    LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size());
+
+                    // We may have raced ahead of reservation code and need to allocate a segment
+                    ensureSegment(segment, flushOffset);
+
+                    // Remove the segment, update the firstSegment and reset flushOffset
+                    final StackedSegment oldSegment = unflushedSegments.remove(0);
+                    oldSegment.completeAll();
+                    uncompletedSegments.remove(oldSegment);
+                    oldSegment.recycle();
+
+                    // Reset the first segment and add it to the uncompleted list
+                    segment = unflushedSegments.get(0);
+                    uncompletedSegments.add(segment);
+
+                    // Update the shutdown offset
+                    if (shutdownOffset != null) {
+                        shutdownOffset -= StackedSegment.SEGMENT_SIZE;
+                    }
+
+                    // Allow reservations back on the fast path by publishing the new first segment
+                    firstSegment = segment;
+
+                    flushOffset = 0;
+                    LOG.debug("Queue {} flush moved to segment {}", this, segment);
+                }
+            }
+        }
+
+        return entries;
+    }
+}
index b9030b817094c5741830af9aa239545119490831..c971c663af5596537d3e49f430862b858220215f 100644 (file)
@@ -109,9 +109,17 @@ final class StackedSegment {
             LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
             entry.fail(new DeviceRequestFailedException("Device-side failure", err));
             return true;
-        } else {
-            return entry.complete(response);
         }
+        return entry.complete(response);
+    }
+
+    OutboundQueueEntry findEntry(final long xid) {
+        if (! xidInRange(xid)) {
+            LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, entries.length, xid);
+            return null;
+        }
+        final int offset = (int)(xid - baseXid);
+        return entries[offset];
     }
 
     OutboundQueueEntry pairRequest(final OfHeader response) {
@@ -122,7 +130,7 @@ final class StackedSegment {
             return null;
         }
 
-        final int offset = (int)(xid - baseXid);
+        final int offset = (int) (xid - baseXid);
         final OutboundQueueEntry entry = entries[offset];
         if (entry.isCompleted()) {
             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
@@ -184,7 +192,7 @@ final class StackedSegment {
     }
 
     void recycle() {
-        for (OutboundQueueEntry e : entries) {
+        for (final OutboundQueueEntry e : entries) {
             e.reset();
         }