From 3545506a79947dec9b4ded9afe51388dadb16e27 Mon Sep 17 00:00:00 2001 From: Vaclav Demcak Date: Sun, 27 Sep 2015 01:55:53 +0200 Subject: [PATCH] Barrier turn on/off - StackedOutboundQueue definition * add abstract definition for StackedOutboundQueue to allow mor variability for possible child implementation Change-Id: I04d8658e0ede049fb0b9265b57a5f7f528998442 Signed-off-by: Vaclav Demcak --- .../AbstractOutboundQueueManager.java | 15 +- .../AbstractStackedOutboundQueue.java | 128 ++++++++++++++++++ .../core/connection/OutboundQueueManager.java | 7 +- .../core/connection/StackedOutboundQueue.java | 101 +------------- 4 files changed, 151 insertions(+), 100 deletions(-) create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java index 49c6ddfa..99bec867 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java @@ -29,7 +29,8 @@ import org.slf4j.LoggerFactory; * Class capsulate basic processing for stacking requests for netty channel * and provide functionality for pairing request/response device message communication. */ -abstract class AbstractOutboundQueueManager extends ChannelInboundHandlerAdapter +abstract class AbstractOutboundQueueManager + extends ChannelInboundHandlerAdapter implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class); @@ -67,7 +68,7 @@ abstract class AbstractOutboundQueueManager exte private final AtomicBoolean flushScheduled = new AtomicBoolean(); protected final ConnectionAdapterImpl parent; protected final InetSocketAddress address; - protected final StackedOutboundQueue currentQueue; + protected final O currentQueue; private final T handler; // Accessed concurrently @@ -89,12 +90,20 @@ abstract class AbstractOutboundQueueManager exte this.parent = Preconditions.checkNotNull(parent); this.handler = Preconditions.checkNotNull(handler); this.address = address; - currentQueue = new StackedOutboundQueue(this); + /* Note: don't wish to use reflection here */ + currentQueue = initializeStackedOutboudnqueue(); LOG.debug("Queue manager instantiated with queue {}", currentQueue); handler.onConnectionQueueChanged(currentQueue); } + /** + * Method has to initialize some child of {@link AbstractStackedOutboundQueue} + * + * @return correct implementation of StacketOutboundqueue + */ + protected abstract O initializeStackedOutboudnqueue(); + @Override public void close() { handler.onConnectionQueueChanged(null); diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java new file mode 100644 index 00000000..332f318d --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import com.google.common.base.Preconditions; +import io.netty.channel.Channel; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class AbstractStackedOutboundQueue implements OutboundQueue { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class); + + protected static final AtomicLongFieldUpdater LAST_XID_UPDATER = AtomicLongFieldUpdater + .newUpdater(AbstractStackedOutboundQueue.class, "lastXid"); + + @GuardedBy("unflushedSegments") + protected volatile StackedSegment firstSegment; + @GuardedBy("unflushedSegments") + protected final List unflushedSegments = new ArrayList<>(2); + @GuardedBy("unflushedSegments") + protected final List uncompletedSegments = new ArrayList<>(2); + + private volatile long lastXid = -1; + + @GuardedBy("unflushedSegments") + protected Integer shutdownOffset; + + // Accessed from Netty only + protected int flushOffset; + + protected final AbstractOutboundQueueManager manager; + + AbstractStackedOutboundQueue(final AbstractOutboundQueueManager manager) { + this.manager = Preconditions.checkNotNull(manager); + firstSegment = StackedSegment.create(0L); + uncompletedSegments.add(firstSegment); + unflushedSegments.add(firstSegment); + } + + /** + * Write some entries from the queue to the channel. Guaranteed to run + * in the corresponding EventLoop. + * + * @param channel Channel onto which we are writing + * @param now + * @return Number of entries written out + */ + abstract int writeEntries(@Nonnull final Channel channel, final long now); + + abstract boolean pairRequest(final OfHeader message); + + boolean needsFlush() { + // flushOffset always points to the first entry, which can be changed only + // from Netty, so we are fine here. + if (firstSegment.getBaseXid() + flushOffset > lastXid) { + return false; + } + + if (shutdownOffset != null && flushOffset >= shutdownOffset) { + return false; + } + + return firstSegment.getEntry(flushOffset).isCommitted(); + } + + long startShutdown(final Channel channel) { + /* + * We are dealing with a multi-threaded shutdown, as the user may still + * be reserving entries in the queue. We are executing in a netty thread, + * so neither flush nor barrier can be running, which is good news. + * We will eat up all the slots in the queue here and mark the offset first + * reserved offset and free up all the cached queues. We then schedule + * the flush task, which will deal with the rest of the shutdown process. + */ + synchronized (unflushedSegments) { + // Increment the offset by the segment size, preventing fast path allocations, + // since we are holding the slow path lock, any reservations will see the queue + // in shutdown and fail accordingly. + final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE); + shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE); + + return lockedShutdownFlush(); + } + } + + boolean finishShutdown() { + synchronized (unflushedSegments) { + lockedShutdownFlush(); + } + + return !needsFlush(); + } + + @GuardedBy("unflushedSegments") + private long lockedShutdownFlush() { + long entries = 0; + + // Fail all queues + final Iterator it = uncompletedSegments.iterator(); + while (it.hasNext()) { + final StackedSegment segment = it.next(); + + entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED); + if (segment.isComplete()) { + LOG.trace("Cleared segment {}", segment); + it.remove(); + } + } + + return entries; + } +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java index 6b90daae..90db23da 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -16,7 +16,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class OutboundQueueManager extends AbstractOutboundQueueManager { +final class OutboundQueueManager extends + AbstractOutboundQueueManager { private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class); private final int maxNonBarrierMessages; @@ -44,6 +45,10 @@ final class OutboundQueueManager extends Abstrac this.maxBarrierNanos = maxBarrierNanos; } + @Override + protected StackedOutboundQueue initializeStackedOutboudnqueue() { + return new StackedOutboundQueue(this); + } private void scheduleBarrierTimer(final long now) { long next = lastBarrierNanos + maxBarrierNanos; diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java index 1da6dd3d..46039904 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java @@ -11,46 +11,23 @@ import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.util.concurrent.FutureCallback; import io.netty.channel.Channel; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class StackedOutboundQueue implements OutboundQueue { +final class StackedOutboundQueue extends AbstractStackedOutboundQueue { private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class); private static final AtomicLongFieldUpdater BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid"); - private static final AtomicLongFieldUpdater LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid"); - - @GuardedBy("unflushedSegments") - private volatile StackedSegment firstSegment; - @GuardedBy("unflushedSegments") - private final List unflushedSegments = new ArrayList<>(2); - @GuardedBy("unflushedSegments") - private final List uncompletedSegments = new ArrayList<>(2); - private final AbstractOutboundQueueManager manager; private volatile long allocatedXid = -1; private volatile long barrierXid = -1; - private volatile long lastXid = -1; - @GuardedBy("unflushedSegments") - private Integer shutdownOffset; - - // Accessed from Netty only - private int flushOffset; - - StackedOutboundQueue(final AbstractOutboundQueueManager manager) { - this.manager = Preconditions.checkNotNull(manager); - firstSegment = StackedSegment.create(0L); - uncompletedSegments.add(firstSegment); - unflushedSegments.add(firstSegment); + StackedOutboundQueue(final AbstractOutboundQueueManager manager) { + super(manager); } @GuardedBy("unflushedSegments") @@ -163,14 +140,7 @@ final class StackedOutboundQueue implements OutboundQueue { manager.ensureFlushing(); } - /** - * Write some entries from the queue to the channel. Guaranteed to run - * in the corresponding EventLoop. - * - * @param channel Channel onto which we are writing - * @param now - * @return Number of entries written out - */ + @Override int writeEntries(@Nonnull final Channel channel, final long now) { // Local cache StackedSegment segment = firstSegment; @@ -247,6 +217,7 @@ final class StackedOutboundQueue implements OutboundQueue { return reserveEntry(); } + @Override boolean pairRequest(final OfHeader message) { Iterator it = uncompletedSegments.iterator(); while (it.hasNext()) { @@ -292,66 +263,4 @@ final class StackedOutboundQueue implements OutboundQueue { LOG.debug("Failed to find completion for message {}", message); return false; } - - long startShutdown(final Channel channel) { - /* - * We are dealing with a multi-threaded shutdown, as the user may still - * be reserving entries in the queue. We are executing in a netty thread, - * so neither flush nor barrier can be running, which is good news. - * - * We will eat up all the slots in the queue here and mark the offset first - * reserved offset and free up all the cached queues. We then schedule - * the flush task, which will deal with the rest of the shutdown process. - */ - synchronized (unflushedSegments) { - // Increment the offset by the segment size, preventing fast path allocations, - // since we are holding the slow path lock, any reservations will see the queue - // in shutdown and fail accordingly. - final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE); - shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE); - - return lockedShutdownFlush(); - } - } - - @GuardedBy("unflushedSegments") - private long lockedShutdownFlush() { - long entries = 0; - - // Fail all queues - final Iterator it = uncompletedSegments.iterator(); - while (it.hasNext()) { - final StackedSegment segment = it.next(); - - entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED); - if (segment.isComplete()) { - LOG.trace("Cleared segment {}", segment); - it.remove(); - } - } - - return entries; - } - - boolean finishShutdown() { - synchronized (unflushedSegments) { - lockedShutdownFlush(); - } - - return !needsFlush(); - } - - boolean needsFlush() { - // flushOffset always points to the first entry, which can be changed only - // from Netty, so we are fine here. - if (firstSegment.getBaseXid() + flushOffset > lastXid) { - return false; - } - - if (shutdownOffset != null && flushOffset >= shutdownOffset) { - return false; - } - - return firstSegment.getEntry(flushOffset).isCommitted(); - } } -- 2.36.6