* Class capsulate basic processing for stacking requests for netty channel
* and provide functionality for pairing request/response device message communication.
*/
-abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter
+abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
+ extends ChannelInboundHandlerAdapter
implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
private final AtomicBoolean flushScheduled = new AtomicBoolean();
protected final ConnectionAdapterImpl parent;
protected final InetSocketAddress address;
- protected final StackedOutboundQueue currentQueue;
+ protected final O currentQueue;
private final T handler;
// Accessed concurrently
this.parent = Preconditions.checkNotNull(parent);
this.handler = Preconditions.checkNotNull(handler);
this.address = address;
- currentQueue = new StackedOutboundQueue(this);
+ /* Note: don't wish to use reflection here */
+ currentQueue = initializeStackedOutboudnqueue();
LOG.debug("Queue manager instantiated with queue {}", currentQueue);
handler.onConnectionQueueChanged(currentQueue);
}
+ /**
+ * Method has to initialize some child of {@link AbstractStackedOutboundQueue}
+ *
+ * @return correct implementation of StacketOutboundqueue
+ */
+ protected abstract O initializeStackedOutboudnqueue();
+
@Override
public void close() {
handler.onConnectionQueueChanged(null);
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractStackedOutboundQueue implements OutboundQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class);
+
+ protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
+
+ @GuardedBy("unflushedSegments")
+ protected volatile StackedSegment firstSegment;
+ @GuardedBy("unflushedSegments")
+ protected final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
+ @GuardedBy("unflushedSegments")
+ protected final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
+
+ private volatile long lastXid = -1;
+
+ @GuardedBy("unflushedSegments")
+ protected Integer shutdownOffset;
+
+ // Accessed from Netty only
+ protected int flushOffset;
+
+ protected final AbstractOutboundQueueManager<?, ?> manager;
+
+ AbstractStackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
+ this.manager = Preconditions.checkNotNull(manager);
+ firstSegment = StackedSegment.create(0L);
+ uncompletedSegments.add(firstSegment);
+ unflushedSegments.add(firstSegment);
+ }
+
+ /**
+ * Write some entries from the queue to the channel. Guaranteed to run
+ * in the corresponding EventLoop.
+ *
+ * @param channel Channel onto which we are writing
+ * @param now
+ * @return Number of entries written out
+ */
+ abstract int writeEntries(@Nonnull final Channel channel, final long now);
+
+ abstract boolean pairRequest(final OfHeader message);
+
+ boolean needsFlush() {
+ // flushOffset always points to the first entry, which can be changed only
+ // from Netty, so we are fine here.
+ if (firstSegment.getBaseXid() + flushOffset > lastXid) {
+ return false;
+ }
+
+ if (shutdownOffset != null && flushOffset >= shutdownOffset) {
+ return false;
+ }
+
+ return firstSegment.getEntry(flushOffset).isCommitted();
+ }
+
+ long startShutdown(final Channel channel) {
+ /*
+ * We are dealing with a multi-threaded shutdown, as the user may still
+ * be reserving entries in the queue. We are executing in a netty thread,
+ * so neither flush nor barrier can be running, which is good news.
+ * We will eat up all the slots in the queue here and mark the offset first
+ * reserved offset and free up all the cached queues. We then schedule
+ * the flush task, which will deal with the rest of the shutdown process.
+ */
+ synchronized (unflushedSegments) {
+ // Increment the offset by the segment size, preventing fast path allocations,
+ // since we are holding the slow path lock, any reservations will see the queue
+ // in shutdown and fail accordingly.
+ final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
+ shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
+
+ return lockedShutdownFlush();
+ }
+ }
+
+ boolean finishShutdown() {
+ synchronized (unflushedSegments) {
+ lockedShutdownFlush();
+ }
+
+ return !needsFlush();
+ }
+
+ @GuardedBy("unflushedSegments")
+ private long lockedShutdownFlush() {
+ long entries = 0;
+
+ // Fail all queues
+ final Iterator<StackedSegment> it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment segment = it.next();
+
+ entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+ if (segment.isComplete()) {
+ LOG.trace("Cleared segment {}", segment);
+ it.remove();
+ }
+ }
+
+ return entries;
+ }
+}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class OutboundQueueManager<T extends OutboundQueueHandler> extends AbstractOutboundQueueManager<T> {
+final class OutboundQueueManager<T extends OutboundQueueHandler> extends
+ AbstractOutboundQueueManager<T, StackedOutboundQueue> {
private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
private final int maxNonBarrierMessages;
this.maxBarrierNanos = maxBarrierNanos;
}
+ @Override
+ protected StackedOutboundQueue initializeStackedOutboudnqueue() {
+ return new StackedOutboundQueue(this);
+ }
private void scheduleBarrierTimer(final long now) {
long next = lastBarrierNanos + maxBarrierNanos;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.FutureCallback;
import io.netty.channel.Channel;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class StackedOutboundQueue implements OutboundQueue {
+final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
- private static final AtomicLongFieldUpdater<StackedOutboundQueue> LAST_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "lastXid");
-
- @GuardedBy("unflushedSegments")
- private volatile StackedSegment firstSegment;
- @GuardedBy("unflushedSegments")
- private final List<StackedSegment> unflushedSegments = new ArrayList<>(2);
- @GuardedBy("unflushedSegments")
- private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
- private final AbstractOutboundQueueManager<?> manager;
private volatile long allocatedXid = -1;
private volatile long barrierXid = -1;
- private volatile long lastXid = -1;
- @GuardedBy("unflushedSegments")
- private Integer shutdownOffset;
-
- // Accessed from Netty only
- private int flushOffset;
-
- StackedOutboundQueue(final AbstractOutboundQueueManager<?> manager) {
- this.manager = Preconditions.checkNotNull(manager);
- firstSegment = StackedSegment.create(0L);
- uncompletedSegments.add(firstSegment);
- unflushedSegments.add(firstSegment);
+ StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
+ super(manager);
}
@GuardedBy("unflushedSegments")
manager.ensureFlushing();
}
- /**
- * Write some entries from the queue to the channel. Guaranteed to run
- * in the corresponding EventLoop.
- *
- * @param channel Channel onto which we are writing
- * @param now
- * @return Number of entries written out
- */
+ @Override
int writeEntries(@Nonnull final Channel channel, final long now) {
// Local cache
StackedSegment segment = firstSegment;
return reserveEntry();
}
+ @Override
boolean pairRequest(final OfHeader message) {
Iterator<StackedSegment> it = uncompletedSegments.iterator();
while (it.hasNext()) {
LOG.debug("Failed to find completion for message {}", message);
return false;
}
-
- long startShutdown(final Channel channel) {
- /*
- * We are dealing with a multi-threaded shutdown, as the user may still
- * be reserving entries in the queue. We are executing in a netty thread,
- * so neither flush nor barrier can be running, which is good news.
- *
- * We will eat up all the slots in the queue here and mark the offset first
- * reserved offset and free up all the cached queues. We then schedule
- * the flush task, which will deal with the rest of the shutdown process.
- */
- synchronized (unflushedSegments) {
- // Increment the offset by the segment size, preventing fast path allocations,
- // since we are holding the slow path lock, any reservations will see the queue
- // in shutdown and fail accordingly.
- final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
- shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
-
- return lockedShutdownFlush();
- }
- }
-
- @GuardedBy("unflushedSegments")
- private long lockedShutdownFlush() {
- long entries = 0;
-
- // Fail all queues
- final Iterator<StackedSegment> it = uncompletedSegments.iterator();
- while (it.hasNext()) {
- final StackedSegment segment = it.next();
-
- entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
- if (segment.isComplete()) {
- LOG.trace("Cleared segment {}", segment);
- it.remove();
- }
- }
-
- return entries;
- }
-
- boolean finishShutdown() {
- synchronized (unflushedSegments) {
- lockedShutdownFlush();
- }
-
- return !needsFlush();
- }
-
- boolean needsFlush() {
- // flushOffset always points to the first entry, which can be changed only
- // from Netty, so we are fine here.
- if (firstSegment.getBaseXid() + flushOffset > lastXid) {
- return false;
- }
-
- if (shutdownOffset != null && flushOffset >= shutdownOffset) {
- return false;
- }
-
- return firstSegment.getEntry(flushOffset).isCommitted();
- }
}