Barrier turn on/off - StackedOutboundQueue definition 98/27498/6
authorVaclav Demcak <vdemcak@cisco.com>
Sat, 26 Sep 2015 23:55:53 +0000 (01:55 +0200)
committerVaclav Demcak <vdemcak@cisco.com>
Tue, 27 Oct 2015 19:01:53 +0000 (20:01 +0100)
* add abstract definition for StackedOutboundQueue to allow
mor variability for possible child implementation

Change-Id: I04d8658e0ede049fb0b9265b57a5f7f528998442
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java

index 49c6ddfa9fb47c155796cbdb3d453cd36704c22d..99bec867e582ae60c2996c67a483ac014c53b679 100644 (file)
@@ -29,7 +29,8 @@ import org.slf4j.LoggerFactory;
  * Class capsulate basic processing for stacking requests for netty channel
  * and provide functionality for pairing request/response device message communication.
  */
-abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter
+abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
+        extends ChannelInboundHandlerAdapter
         implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
@@ -67,7 +68,7 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
     private final AtomicBoolean flushScheduled = new AtomicBoolean();
     protected final ConnectionAdapterImpl parent;
     protected final InetSocketAddress address;
-    protected final StackedOutboundQueue currentQueue;
+    protected final O currentQueue;
     private final T handler;
 
     // Accessed concurrently
@@ -89,12 +90,20 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
         this.parent = Preconditions.checkNotNull(parent);
         this.handler = Preconditions.checkNotNull(handler);
         this.address = address;
-        currentQueue = new StackedOutboundQueue(this);
+        /* Note: don't wish to use reflection here */
+        currentQueue = initializeStackedOutboudnqueue();
         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
 
         handler.onConnectionQueueChanged(currentQueue);
     }
 
+    /**
+     * Method has to initialize some child of {@link AbstractStackedOutboundQueue}
+     *
+     * @return correct implementation of StacketOutboundqueue
+     */
+    protected abstract O initializeStackedOutboudnqueue();
+
     @Override
     public void close() {
         handler.onConnectionQueueChanged(null);
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java
new file mode 100644 (file)
index 0000000..332f318
--- /dev/null
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractStackedOutboundQueue implements OutboundQueue {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class);
+
+    protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
+
+    @GuardedBy("unflushedSegments")
+    protected volatile StackedSegment firstSegment;
+    @GuardedBy("unflushedSegments")
+    protected final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
+    @GuardedBy("unflushedSegments")
+    protected final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
+
+    private volatile long lastXid = -1;
+
+    @GuardedBy("unflushedSegments")
+    protected Integer shutdownOffset;
+
+    // Accessed from Netty only
+    protected int flushOffset;
+
+    protected final AbstractOutboundQueueManager<?, ?> manager;
+
+    AbstractStackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
+        this.manager = Preconditions.checkNotNull(manager);
+        firstSegment = StackedSegment.create(0L);
+        uncompletedSegments.add(firstSegment);
+        unflushedSegments.add(firstSegment);
+    }
+
+    /**
+     * Write some entries from the queue to the channel. Guaranteed to run
+     * in the corresponding EventLoop.
+     *
+     * @param channel Channel onto which we are writing
+     * @param now
+     * @return Number of entries written out
+     */
+    abstract int writeEntries(@Nonnull final Channel channel, final long now);
+
+    abstract boolean pairRequest(final OfHeader message);
+
+    boolean needsFlush() {
+        // flushOffset always points to the first entry, which can be changed only
+        // from Netty, so we are fine here.
+        if (firstSegment.getBaseXid() + flushOffset > lastXid) {
+            return false;
+        }
+
+        if (shutdownOffset != null && flushOffset >= shutdownOffset) {
+            return false;
+        }
+
+        return firstSegment.getEntry(flushOffset).isCommitted();
+    }
+
+    long startShutdown(final Channel channel) {
+        /*
+         * We are dealing with a multi-threaded shutdown, as the user may still
+         * be reserving entries in the queue. We are executing in a netty thread,
+         * so neither flush nor barrier can be running, which is good news.
+         * We will eat up all the slots in the queue here and mark the offset first
+         * reserved offset and free up all the cached queues. We then schedule
+         * the flush task, which will deal with the rest of the shutdown process.
+         */
+        synchronized (unflushedSegments) {
+            // Increment the offset by the segment size, preventing fast path allocations,
+            // since we are holding the slow path lock, any reservations will see the queue
+            // in shutdown and fail accordingly.
+            final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
+            shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
+
+            return lockedShutdownFlush();
+        }
+    }
+
+    boolean finishShutdown() {
+        synchronized (unflushedSegments) {
+            lockedShutdownFlush();
+        }
+
+        return !needsFlush();
+    }
+
+    @GuardedBy("unflushedSegments")
+    private long lockedShutdownFlush() {
+        long entries = 0;
+
+        // Fail all queues
+        final Iterator<StackedSegment> it = uncompletedSegments.iterator();
+        while (it.hasNext()) {
+            final StackedSegment segment = it.next();
+
+            entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+            if (segment.isComplete()) {
+                LOG.trace("Cleared segment {}", segment);
+                it.remove();
+            }
+        }
+
+        return entries;
+    }
+}
index 6b90daae2695d1e937026af0dc730b6cbbd8b0b1..90db23da65d3dfa37b245edf3ce1b6af5d7fd951 100644 (file)
@@ -16,7 +16,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class OutboundQueueManager<T extends OutboundQueueHandler> extends AbstractOutboundQueueManager<T> {
+final class OutboundQueueManager<T extends OutboundQueueHandler> extends
+        AbstractOutboundQueueManager<T, StackedOutboundQueue> {
     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
 
     private final int maxNonBarrierMessages;
@@ -44,6 +45,10 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Abstrac
         this.maxBarrierNanos = maxBarrierNanos;
     }
 
+    @Override
+    protected StackedOutboundQueue initializeStackedOutboudnqueue() {
+        return new StackedOutboundQueue(this);
+    }
 
     private void scheduleBarrierTimer(final long now) {
         long next = lastBarrierNanos + maxBarrierNanos;
index 1da6dd3dff01203d3eaea854924c05a18e902ec6..46039904543950e1a074a0381b7d7414210f26bd 100644 (file)
@@ -11,46 +11,23 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
 import io.netty.channel.Channel;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class StackedOutboundQueue implements OutboundQueue {
+final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
     private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
     private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
-    private static final AtomicLongFieldUpdater<StackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid");
-
-    @GuardedBy("unflushedSegments")
-    private volatile StackedSegment firstSegment;
-    @GuardedBy("unflushedSegments")
-    private final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
-    @GuardedBy("unflushedSegments")
-    private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
-    private final AbstractOutboundQueueManager<?> manager;
 
     private volatile long allocatedXid = -1;
     private volatile long barrierXid = -1;
-    private volatile long lastXid = -1;
 
-    @GuardedBy("unflushedSegments")
-    private Integer shutdownOffset;
-
-    // Accessed from Netty only
-    private int flushOffset;
-
-    StackedOutboundQueue(final AbstractOutboundQueueManager<?> manager) {
-        this.manager = Preconditions.checkNotNull(manager);
-        firstSegment = StackedSegment.create(0L);
-        uncompletedSegments.add(firstSegment);
-        unflushedSegments.add(firstSegment);
+    StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
+        super(manager);
     }
 
     @GuardedBy("unflushedSegments")
@@ -163,14 +140,7 @@ final class StackedOutboundQueue implements OutboundQueue {
         manager.ensureFlushing();
     }
 
-    /**
-     * Write some entries from the queue to the channel. Guaranteed to run
-     * in the corresponding EventLoop.
-     *
-     * @param channel Channel onto which we are writing
-     * @param now
-     * @return Number of entries written out
-     */
+    @Override
     int writeEntries(@Nonnull final Channel channel, final long now) {
         // Local cache
         StackedSegment segment = firstSegment;
@@ -247,6 +217,7 @@ final class StackedOutboundQueue implements OutboundQueue {
         return reserveEntry();
     }
 
+    @Override
     boolean pairRequest(final OfHeader message) {
         Iterator<StackedSegment> it = uncompletedSegments.iterator();
         while (it.hasNext()) {
@@ -292,66 +263,4 @@ final class StackedOutboundQueue implements OutboundQueue {
         LOG.debug("Failed to find completion for message {}", message);
         return false;
     }
-
-    long startShutdown(final Channel channel) {
-        /*
-         * We are dealing with a multi-threaded shutdown, as the user may still
-         * be reserving entries in the queue. We are executing in a netty thread,
-         * so neither flush nor barrier can be running, which is good news.
-         *
-         * We will eat up all the slots in the queue here and mark the offset first
-         * reserved offset and free up all the cached queues. We then schedule
-         * the flush task, which will deal with the rest of the shutdown process.
-         */
-        synchronized (unflushedSegments) {
-            // Increment the offset by the segment size, preventing fast path allocations,
-            // since we are holding the slow path lock, any reservations will see the queue
-            // in shutdown and fail accordingly.
-            final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
-            shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
-
-            return lockedShutdownFlush();
-        }
-    }
-
-    @GuardedBy("unflushedSegments")
-    private long lockedShutdownFlush() {
-        long entries = 0;
-
-        // Fail all queues
-        final Iterator<StackedSegment> it = uncompletedSegments.iterator();
-        while (it.hasNext()) {
-            final StackedSegment segment = it.next();
-
-            entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
-            if (segment.isComplete()) {
-                LOG.trace("Cleared segment {}", segment);
-                it.remove();
-            }
-        }
-
-        return entries;
-    }
-
-    boolean finishShutdown() {
-        synchronized (unflushedSegments) {
-            lockedShutdownFlush();
-        }
-
-        return !needsFlush();
-    }
-
-    boolean needsFlush() {
-        // flushOffset always points to the first entry, which can be changed only
-        // from Netty, so we are fine here.
-        if (firstSegment.getBaseXid() + flushOffset > lastXid) {
-            return false;
-        }
-
-        if (shutdownOffset != null && flushOffset >= shutdownOffset) {
-            return false;
-        }
-
-        return firstSegment.getEntry(flushOffset).isCommitted();
-    }
 }