X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FAbstractStackedOutboundQueue.java;fp=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FAbstractStackedOutboundQueue.java;h=332f318de45fc677b39d00ac9245689361de5051;hb=2429e5af03939e1f8db057247a654e361a266645;hp=0000000000000000000000000000000000000000;hpb=8bc243af9d7463bdd124b7c12b8fde15dddb2527;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java new file mode 100644 index 00000000..332f318d --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import com.google.common.base.Preconditions; +import io.netty.channel.Channel; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class AbstractStackedOutboundQueue implements OutboundQueue { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class); + + protected static final AtomicLongFieldUpdater LAST_XID_UPDATER = AtomicLongFieldUpdater + .newUpdater(AbstractStackedOutboundQueue.class, "lastXid"); + + @GuardedBy("unflushedSegments") + protected volatile StackedSegment firstSegment; + @GuardedBy("unflushedSegments") + protected final List unflushedSegments = new ArrayList<>(2); + @GuardedBy("unflushedSegments") + protected final List uncompletedSegments = new ArrayList<>(2); + + private volatile long lastXid = -1; + + @GuardedBy("unflushedSegments") + protected Integer shutdownOffset; + + // Accessed from Netty only + protected int flushOffset; + + protected final AbstractOutboundQueueManager manager; + + AbstractStackedOutboundQueue(final AbstractOutboundQueueManager manager) { + this.manager = Preconditions.checkNotNull(manager); + firstSegment = StackedSegment.create(0L); + uncompletedSegments.add(firstSegment); + unflushedSegments.add(firstSegment); + } + + /** + * Write some entries from the queue to the channel. Guaranteed to run + * in the corresponding EventLoop. + * + * @param channel Channel onto which we are writing + * @param now + * @return Number of entries written out + */ + abstract int writeEntries(@Nonnull final Channel channel, final long now); + + abstract boolean pairRequest(final OfHeader message); + + boolean needsFlush() { + // flushOffset always points to the first entry, which can be changed only + // from Netty, so we are fine here. + if (firstSegment.getBaseXid() + flushOffset > lastXid) { + return false; + } + + if (shutdownOffset != null && flushOffset >= shutdownOffset) { + return false; + } + + return firstSegment.getEntry(flushOffset).isCommitted(); + } + + long startShutdown(final Channel channel) { + /* + * We are dealing with a multi-threaded shutdown, as the user may still + * be reserving entries in the queue. We are executing in a netty thread, + * so neither flush nor barrier can be running, which is good news. + * We will eat up all the slots in the queue here and mark the offset first + * reserved offset and free up all the cached queues. We then schedule + * the flush task, which will deal with the rest of the shutdown process. + */ + synchronized (unflushedSegments) { + // Increment the offset by the segment size, preventing fast path allocations, + // since we are holding the slow path lock, any reservations will see the queue + // in shutdown and fail accordingly. + final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE); + shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE); + + return lockedShutdownFlush(); + } + } + + boolean finishShutdown() { + synchronized (unflushedSegments) { + lockedShutdownFlush(); + } + + return !needsFlush(); + } + + @GuardedBy("unflushedSegments") + private long lockedShutdownFlush() { + long entries = 0; + + // Fail all queues + final Iterator it = uncompletedSegments.iterator(); + while (it.hasNext()) { + final StackedSegment segment = it.next(); + + entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED); + if (segment.isComplete()) { + LOG.trace("Cleared segment {}", segment); + it.remove(); + } + } + + return entries; + } +}