* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
-
package org.opendaylight.openflowjava.protocol.api.connection;
+import com.google.common.annotations.Beta;
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
-
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
* @return true, if channel is configured to autoread
*/
boolean isAutoRead();
+
+ /**
+ * Registers a new bypass outbound queue
+ * @param handler
+ * @param maxQueueDepth
+ * @param maxBarrierNanos
+ * @return An {@link OutboundQueueHandlerRegistration}
+ */
+ @Beta
+ <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(T handler,
+ int maxQueueDepth, long maxBarrierNanos);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.api.connection;
+
+import com.google.common.annotations.Beta;
+import com.google.common.util.concurrent.FutureCallback;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+@Beta
+public interface OutboundQueue {
+ /**
+ * Reserve an entry in the outbound queue.
+ * @return XID for the new message, or null if the queue is full
+ */
+ Long reserveEntry();
+
+ /**
+ * Commit the specified offset using a message. Specified callback will
+ * be invoked once we know how it has resolved, either with a normal response,
+ * implied completion via a barrier, or failure (such as connection drop). For
+ * multipart responses, {@link FutureCallback#onSuccess(Object)} will be invoked
+ * multiple times as the corresponding responses arrive. If the request is completed
+ * with a response, the object reported will be non-null. If the request's completion
+ * is implied by a barrier, the object reported will be null.
+ *
+ * @param xid Previously-reserved XID
+ * @param message Message which should be sent out, or null if the reservation
+ * should be cancelled.
+ * @param callback Callback to be invoked, or null if no callback should be invoked.
+ * @throws IllegalArgumentException if the slot is already committed or was never reserved.
+ */
+ void commitEntry(@Nonnull Long xid, @Nullable OfHeader message, @Nullable FutureCallback<OfHeader> callback);
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.api.connection;
+
+import com.google.common.annotations.Beta;
+import javax.annotation.Nonnull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+
+/**
+ * Handler of the outbound queue. The queue has a maximum depth assigned when the
+ * handler is registered.
+ */
+@Beta
+public interface OutboundQueueHandler {
+ /**
+ * Create a new {@link BarrierInput barrier} message. This callback is invoked
+ * when the queue is being flushed to the switch. The barrier ensures that any
+ * outstanding requests are detected as either completed or failed.
+ *
+ * @param xid XID for the barrier message
+ * @return New barrier message.
+ */
+ @Nonnull BarrierInput createBarrierRequest(@Nonnull Long xid);
+
+ /**
+ * Invoked whenever the underlying queue is refreshed. Implementations should
+ * ensure they are talking to the latest queue
+ * @param queue New queue instance, null indicates a shutdown, e.g. the queue
+ * is no longer available.
+ */
+ void onConnectionQueueChanged(OutboundQueue queue);
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.api.connection;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+
+/**
+ * An {@link ObjectRegistration} of a {@link OutboundQueueHandler}. Registration can be cancelled
+ * by invoking {@link #close()}.
+ *
+ * @param <T> Handler type
+ */
+@Beta
+public interface OutboundQueueHandlerRegistration<T extends OutboundQueueHandler> extends ObjectRegistration<T> {
+ @Override
+ void close();
+}
package org.opendaylight.openflowjava.protocol.impl.core.connection;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
-
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowjava.statistics.CounterEventTypes;
import org.opendaylight.openflowjava.statistics.StatisticsCounters;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
* Handles messages (notifications + rpcs) and connections
* @author mirehak
private ConnectionReadyListener connectionReadyListener;
private OpenflowProtocolListener messageListener;
private SystemNotificationsListener systemListener;
+ private OutboundQueueManager<?> outputManager;
private boolean disconnectOccured = false;
- private StatisticsCounters statisticsCounters;
+ private final StatisticsCounters statisticsCounters;
+ private final InetSocketAddress address;
/**
* default ctor
.removalListener(REMOVAL_LISTENER).build();
this.channel = Preconditions.checkNotNull(channel);
this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
+ this.address = address;
channel.pipeline().addLast(output);
statisticsCounters = StatisticsCounters.getInstance();
LOG.debug("ConnectionAdapter created");
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof ExperimenterMessage) {
messageListener.onExperimenterMessage((ExperimenterMessage) message);
+ if (outputManager != null) {
+ outputManager.onMessage((OfHeader) message);
+ }
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof FlowRemovedMessage) {
messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof MultipartReplyMessage) {
messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
+ if (outputManager != null) {
+ outputManager.onMessage((OfHeader) message);
+ }
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof PacketInMessage) {
messageListener.onPacketInMessage((PacketInMessage) message);
} else {
LOG.warn("message listening not supported for type: {}", message.getClass());
}
- } else {
- if (message instanceof OfHeader) {
- LOG.debug("OFheader msg received");
+ } else if (message instanceof OfHeader) {
+ LOG.debug("OFheader msg received");
+
+ if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
RpcResponseKey key = createRpcResponseKey((OfHeader) message);
final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
if (listener != null) {
} else {
LOG.warn("received unexpected rpc response: {}", key);
}
- } else {
- LOG.warn("message listening not supported for type: {}", message.getClass());
}
+ } else {
+ LOG.warn("message listening not supported for type: {}", message.getClass());
}
}
* Used only for testing purposes
* @param cache
*/
- public void setResponseCache(Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache) {
+ public void setResponseCache(final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache) {
this.responseCache = cache;
}
}
@Override
- public void setAutoRead(boolean autoRead) {
+ public void setAutoRead(final boolean autoRead) {
channel.config().setAutoRead(autoRead);
}
+
+ @Override
+ public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
+ final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
+ Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
+
+ final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
+ outputManager = ret;
+ channel.pipeline().addLast(outputManager);
+
+ return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
+ @Override
+ protected void removeRegistration() {
+ outputManager.close();
+ channel.pipeline().remove(outputManager);
+ outputManager = null;
+ }
+ };
+ }
+
+ Channel getChannel() {
+ return channel;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class OutboundQueueEntry {
+ private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class);
+ private FutureCallback<OfHeader> callback;
+ private OfHeader message;
+ private boolean completed;
+ private volatile boolean committed;
+
+ void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
+ this.message = message;
+ this.callback = callback;
+
+ // Volatile write, needs to be last
+ committed = true;
+ }
+
+ void reset() {
+ callback = null;
+ message = null;
+ completed = false;
+
+ // Volatile write, needs to be last
+ committed = false;
+ }
+
+ boolean isBarrier() {
+ return message instanceof BarrierInput;
+ }
+
+ boolean isCommitted() {
+ return committed;
+ }
+
+ boolean isCompleted() {
+ return completed;
+ }
+
+ OfHeader getMessage() {
+ return message;
+ }
+
+ boolean complete(final OfHeader response) {
+ Preconditions.checkState(!completed, "Attempted to complete a completed message %s with response %s", message, response);
+
+ // Multipart requests are special, we have to look at them to see
+ // if there is something outstanding and adjust ourselves accordingly
+ final boolean reallyComplete;
+ if (response instanceof MultipartReplyMessage) {
+ reallyComplete = !((MultipartReplyMessage) response).getFlags().isOFPMPFREQMORE();
+ LOG.debug("Multipart reply {}", response);
+ } else {
+ reallyComplete = true;
+ }
+
+ completed = reallyComplete;
+ if (callback != null) {
+ callback.onSuccess(response);
+ }
+ LOG.debug("Entry {} completed {} with response {}", this, completed, response);
+ return reallyComplete;
+ }
+
+ void fail(final Throwable cause) {
+ if (!completed) {
+ completed = true;
+ if (callback != null) {
+ callback.onFailure(cause);
+ }
+ } else {
+ LOG.warn("Ignoring failure {} for completed message {}", cause, message);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+abstract class OutboundQueueHandlerRegistrationImpl<T extends OutboundQueueHandler> extends AbstractObjectRegistration<T> implements OutboundQueueHandlerRegistration<T> {
+ protected OutboundQueueHandlerRegistrationImpl(final T instance) {
+ super(instance);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
+import javax.annotation.Nonnull;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class OutboundQueueImpl implements OutboundQueue {
+ private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
+ private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
+ private static final long FLUSH_RETRY_NANOS = 1L;
+ private final OutboundQueueManager<?> manager;
+ private final OutboundQueueEntry[] queue;
+ private final long baseXid;
+ private final long endXid;
+ private final int reserve;
+
+ // Updated concurrently
+ private volatile int reserveOffset;
+
+ // Updated from Netty only
+ private int flushOffset;
+ private int completeCount;
+
+ OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
+ /*
+ * We use the last entry as an emergency should a timeout-triggered
+ * flush request race with normal users for the last entry in this
+ * queue. In that case the flush request will take the last entry and
+ * schedule a flush, which means that we will get around sending the
+ * message as soon as the user finishes the reservation.
+ */
+ Preconditions.checkArgument(maxQueue > 1);
+ this.baseXid = baseXid;
+ this.endXid = baseXid + maxQueue;
+ this.reserve = maxQueue - 1;
+ this.manager = Preconditions.checkNotNull(manager);
+ queue = new OutboundQueueEntry[maxQueue];
+ for (int i = 0; i < maxQueue; ++i) {
+ queue[i] = new OutboundQueueEntry();
+ }
+ }
+
+ private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
+ this.manager = Preconditions.checkNotNull(manager);
+ this.queue = Preconditions.checkNotNull(queue);
+ this.baseXid = baseXid;
+ this.endXid = baseXid + queue.length;
+ this.reserve = queue.length - 1;
+ for (OutboundQueueEntry element : queue) {
+ element.reset();
+ }
+ }
+
+ OutboundQueueImpl reuse(final long baseXid) {
+ return new OutboundQueueImpl(manager, baseXid, queue);
+ }
+
+ Long reserveEntry(final boolean forBarrier) {
+ final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
+ if (offset >= reserve) {
+ if (forBarrier) {
+ LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
+ return endXid;
+ } else {
+ LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
+ return null;
+ }
+ }
+
+ final Long xid = baseXid + offset;
+ LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
+ return xid;
+ }
+
+ @Override
+ public Long reserveEntry() {
+ return reserveEntry(false);
+ }
+
+ @Override
+ public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+ final int offset = (int)(xid - baseXid);
+ if (message != null) {
+ Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+ }
+
+ queue[offset].commit(message, callback);
+ LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+
+ manager.ensureFlushing(this);
+ }
+
+ /**
+ * An empty queue is a queue which has no further unflushed entries.
+ *
+ * @return True if this queue does not have unprocessed entries.
+ */
+ boolean isEmpty() {
+ int ro = reserveOffset;
+ if (ro >= reserve) {
+ if (queue[reserve].isCommitted()) {
+ ro = reserve + 1;
+ } else {
+ ro = reserve;
+ }
+ }
+
+ LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
+ return ro <= flushOffset;
+ }
+
+ /**
+ * A queue is finished when all of its entries have been completed.
+ *
+ * @return False if there are any uncompleted requests.
+ */
+ boolean isFinished() {
+ if (completeCount < reserve) {
+ return false;
+ }
+
+ // We need to check if the last entry was used
+ final OutboundQueueEntry last = queue[reserve];
+ return !last.isCommitted() || last.isCompleted();
+ }
+
+ boolean isFlushed() {
+ LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
+ if (flushOffset < reserve) {
+ return false;
+ }
+
+ // flushOffset implied == reserve
+ return flushOffset >= queue.length || !queue[reserve].isCommitted();
+ }
+
+ OfHeader flushEntry() {
+ for (;;) {
+ // No message ready
+ if (isEmpty()) {
+ LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
+ return null;
+ }
+
+ boolean retry = true;
+ while (!queue[flushOffset].isCommitted()) {
+ if (!retry) {
+ LOG.debug("Offset {} not ready yet, giving up", flushOffset);
+ return null;
+ }
+
+ LOG.debug("Offset {} not ready yet, retrying", flushOffset);
+ LockSupport.parkNanos(FLUSH_RETRY_NANOS);
+ retry = false;
+ }
+
+ final OfHeader msg = queue[flushOffset++].getMessage();
+ if (msg != null) {
+ return msg;
+ }
+ }
+ }
+
+ private boolean xidInRance(final long xid) {
+ return xid < endXid && (xid >= baseXid || baseXid > endXid);
+ }
+
+ /**
+ * Return the request entry corresponding to a response. Returns null
+ * if there is no request matching the response.
+ *
+ * @param response Response message
+ * @return Matching request entry, or null if no match is found.
+ */
+ OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
+ final Long xid = response.getXid();
+ if (!xidInRance(xid)) {
+ LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
+ return null;
+ }
+
+ final int offset = (int)(xid - baseXid);
+ final OutboundQueueEntry entry = queue[offset];
+ if (entry.isCompleted()) {
+ LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
+ return null;
+ }
+
+ if (entry.complete(response)) {
+ completeCount++;
+
+ // This has been a barrier -- make sure we complete all preceding requests
+ if (entry.isBarrier()) {
+ LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1);
+ for (int i = 0; i < offset; ++i) {
+ final OutboundQueueEntry e = queue[i];
+ if (!e.isCompleted() && e.complete(null)) {
+ completeCount++;
+ }
+ }
+ }
+ }
+ return entry;
+ }
+
+ void completeAll() {
+ for (OutboundQueueEntry entry : queue) {
+ if (!entry.isCompleted() && entry.complete(null)) {
+ completeCount++;
+ }
+ }
+ }
+
+ int failAll(final Throwable cause) {
+ int ret = 0;
+ for (OutboundQueueEntry entry : queue) {
+ if (!entry.isCompleted()) {
+ entry.fail(cause);
+ ret++;
+ }
+ }
+
+ return ret;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
+
+ /**
+ * This is the default upper bound we place on the flush task running
+ * a single iteration. We relinquish control after about this amount
+ * of time.
+ */
+ private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
+
+ /**
+ * We re-check the time spent flushing every this many messages. We do this because
+ * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
+ * or similar to disable the feature.
+ */
+ private static final int WORKTIME_RECHECK_MSGS = 64;
+
+ /**
+ * We maintain a cache of this many previous queues for later reuse.
+ */
+ private static final int QUEUE_CACHE_SIZE = 4;
+
+ private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
+ private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
+ private final ConnectionAdapterImpl parent;
+ private final InetSocketAddress address;
+ private final long maxBarrierNanos;
+ private final long maxWorkTime;
+ private final int queueSize;
+ private final T handler;
+
+ /*
+ * Instead of using an AtomicBoolean object, we use these two. It saves us
+ * from allocating an extra object.
+ */
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater<OutboundQueueManager> FLUSH_SCHEDULED_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled");
+ private volatile int flushScheduled = 0;
+
+ // Updated from netty only
+ private long lastBarrierNanos = System.nanoTime();
+ private OutboundQueueImpl currentQueue;
+ private int nonBarrierMessages;
+ private long lastXid = 0;
+
+ // Passed to executor to request triggering of flush
+ private final Runnable flushRunnable = new Runnable() {
+ @Override
+ public void run() {
+ flush();
+ }
+ };
+ private final Runnable barrierRunnable = new Runnable() {
+ @Override
+ public void run() {
+ barrier();
+ }
+ };
+
+ OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
+ final int queueSize, final long maxBarrierNanos) {
+ this.parent = Preconditions.checkNotNull(parent);
+ this.handler = Preconditions.checkNotNull(handler);
+ Preconditions.checkArgument(queueSize > 0);
+ this.queueSize = queueSize;
+ Preconditions.checkArgument(maxBarrierNanos > 0);
+ this.maxBarrierNanos = maxBarrierNanos;
+ this.address = address;
+ this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
+
+ LOG.debug("Queue manager instantiated with queue size {}", queueSize);
+ createQueue();
+ scheduleBarrierTimer(lastBarrierNanos);
+ }
+
+ T getHandler() {
+ return handler;
+ }
+
+ @Override
+ public void close() {
+ handler.onConnectionQueueChanged(null);
+ }
+
+ private void retireQueue(final OutboundQueueImpl queue) {
+ if (queueCache.offer(queue)) {
+ LOG.debug("Saving queue {} for later reuse", queue);
+ } else {
+ LOG.debug("Queue {} thrown away", queue);
+ }
+ }
+
+ private void createQueue() {
+ final long baseXid = lastXid;
+ lastXid += queueSize + 1;
+
+ final OutboundQueueImpl cached = queueCache.poll();
+ final OutboundQueueImpl queue;
+ if (cached != null) {
+ queue = cached.reuse(baseXid);
+ LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
+ } else {
+ queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
+ LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel());
+ }
+
+ activeQueues.add(queue);
+ currentQueue = queue;
+ handler.onConnectionQueueChanged(queue);
+ }
+
+ private void scheduleBarrierTimer(final long now) {
+ long next = lastBarrierNanos + maxBarrierNanos;
+ if (next < now) {
+ LOG.debug("Attempted to schedule barrier in the past, reset maximum)");
+ next = now + maxBarrierNanos;
+ }
+
+ final long delay = next - now;
+ LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
+ parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
+ }
+
+ private void scheduleBarrierMessage() {
+ final Long xid = currentQueue.reserveEntry(true);
+ Verify.verifyNotNull(xid);
+
+ currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
+ LOG.debug("Barrier XID {} scheduled", xid);
+
+ // We can see into the future when compared to flushEntry(), as that
+ // codepath may be lagging behind on messages. Resetting the counter
+ // here ensures that flushEntry() will not attempt to issue a flush
+ // request. Note that we do not reset current time, as that should
+ // reflect when we sent the message for real.
+ nonBarrierMessages = 0;
+ }
+
+ /**
+ * Flush an entry from the queue.
+ *
+ * @param now Time reference for 'now'. We take this as an argument, as
+ * we need a timestamp to mark barrier messages we see swinging
+ * by. That timestamp does not need to be completely accurate,
+ * hence we use the flush start time. Alternative would be to
+ * measure System.nanoTime() for each barrier -- needlessly
+ * adding overhead.
+ *
+ * @return Entry which was flushed, null if no entry is ready.
+ */
+ OfHeader flushEntry(final long now) {
+ final OfHeader message = currentQueue.flushEntry();
+ if (currentQueue.isFlushed()) {
+ LOG.debug("Queue {} is fully flushed", currentQueue);
+ createQueue();
+ }
+
+ if (message == null) {
+ return null;
+ }
+
+ if (message instanceof BarrierInput) {
+ LOG.debug("Barrier message seen, resetting counters");
+ nonBarrierMessages = 0;
+ lastBarrierNanos = now;
+ } else {
+ nonBarrierMessages++;
+ if (nonBarrierMessages >= queueSize) {
+ LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+ scheduleBarrierMessage();
+ }
+ }
+
+ return message;
+ }
+
+ /**
+ * Invoked whenever a message comes in from the switch. Runs matching
+ * on all active queues in an attempt to complete a previous request.
+ *
+ * @param message Potential response message
+ * @return True if the message matched a previous request, false otherwise.
+ */
+ boolean onMessage(final OfHeader message) {
+ LOG.debug("Attempting to pair message {} to a request", message);
+
+ Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+ while (it.hasNext()) {
+ final OutboundQueueImpl queue = it.next();
+ final OutboundQueueEntry entry = queue.pairRequest(message);
+
+ if (entry == null) {
+ continue;
+ }
+
+ LOG.debug("Queue {} accepted response {}", queue, message);
+
+ // This has been a barrier request, we need to flush all
+ // previous queues
+ if (entry.isBarrier() && activeQueues.size() > 1) {
+ LOG.debug("Queue {} indicated request was a barrier", queue);
+
+ it = activeQueues.iterator();
+ while (it.hasNext()) {
+ final OutboundQueueImpl q = it.next();
+
+ // We want to complete all queues before the current one, we will
+ // complete the current queue below
+ if (!queue.equals(q)) {
+ LOG.debug("Queue {} is implied finished", q);
+ q.completeAll();
+ it.remove();
+ retireQueue(q);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (queue.isFinished()) {
+ LOG.debug("Queue {} is finished", queue);
+ it.remove();
+ retireQueue(queue);
+ }
+
+ return true;
+ }
+
+ LOG.debug("Failed to find completion for message {}", message);
+ return false;
+ }
+
+ private void scheduleFlush() {
+ if (parent.getChannel().isWritable()) {
+ if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
+ LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+ parent.getChannel().eventLoop().execute(flushRunnable);
+ } else {
+ LOG.trace("Flush task is already present on channel {}", parent.getChannel());
+ }
+ } else {
+ LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+ }
+ }
+
+ void ensureFlushing(final OutboundQueueImpl queue) {
+ Preconditions.checkState(currentQueue.equals(queue));
+ scheduleFlush();
+ }
+
+ /**
+ * Periodic barrier check.
+ */
+ protected void barrier() {
+ LOG.debug("Channel {} barrier timer expired", parent.getChannel());
+ if (currentQueue == null) {
+ LOG.debug("Channel shut down, not processing barrier");
+ return;
+ }
+
+ final long now = System.nanoTime();
+ final long sinceLast = now - lastBarrierNanos;
+ if (sinceLast >= maxBarrierNanos) {
+ LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
+ // FIXME: we should be tracking requests/responses instead of this
+ if (nonBarrierMessages == 0) {
+ LOG.debug("No messages written since last barrier, not issuing one");
+ } else {
+ scheduleBarrierMessage();
+ }
+ }
+
+ scheduleBarrierTimer(now);
+ }
+
+ /**
+ * Perform a single flush operation.
+ */
+ protected void flush() {
+ final long start = System.nanoTime();
+ final long deadline = start + maxWorkTime;
+
+ LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
+
+ long messages = 0;
+ for (;; ++messages) {
+ if (!parent.getChannel().isWritable()) {
+ LOG.trace("Channel is no longer writable");
+ break;
+ }
+
+ final OfHeader message = flushEntry(start);
+ if (message == null) {
+ LOG.trace("The queue is completely drained");
+ break;
+ }
+
+ final Object wrapper;
+ if (address == null) {
+ wrapper = new MessageListenerWrapper(message, null);
+ } else {
+ wrapper = new UdpMessageListenerWrapper(message, null, address);
+ }
+ parent.getChannel().write(wrapper);
+
+ /*
+ * Check every WORKTIME_RECHECK_MSGS for exceeded time.
+ *
+ * XXX: given we already measure our flushing throughput, we
+ * should be able to perform dynamic adjustments here.
+ * is that additional complexity needed, though?
+ */
+ if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
+ LOG.trace("Exceeded allotted work time {}us",
+ TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
+ break;
+ }
+ }
+
+ if (messages > 0) {
+ LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
+ parent.getChannel().flush();
+ }
+
+ final long stop = System.nanoTime();
+ LOG.debug("Flushed {} messages in {}us to channel {}",
+ messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
+
+ /*
+ * We are almost ready to terminate. This is a bit tricky, because
+ * we do not want to have a race window where a message would be
+ * stuck on the queue without a flush being scheduled.
+ *
+ * So we mark ourselves as not running and then re-check if a
+ * flush out is needed. That will re-synchronized with other threads
+ * such that only one flush is scheduled at any given time.
+ */
+ if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
+ LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+ }
+
+ conditionalFlush();
+ }
+
+
+ /**
+ * Schedule a queue flush if it is not empty and the channel is found
+ * to be writable. May only be called from Netty context.
+ */
+ private void conditionalFlush() {
+ if (!currentQueue.isEmpty()) {
+ scheduleFlush();
+ } else {
+ LOG.trace("Queue is empty, no flush needed");
+ }
+ }
+
+ private void conditionalFlush(final ChannelHandlerContext ctx) {
+ Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
+ conditionalFlush();
+ }
+
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+ super.channelActive(ctx);
+ conditionalFlush(ctx);
+ }
+
+ @Override
+ public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
+ super.channelWritabilityChanged(ctx);
+ conditionalFlush(ctx);
+ }
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+
+ long entries = 0;
+ LOG.debug("Channel shutdown, flushing queue...");
+ handler.onConnectionQueueChanged(null);
+
+ final Throwable cause = new RejectedExecutionException("Channel disconnected");
+ for (OutboundQueueImpl queue : activeQueues) {
+ entries += queue.failAll(cause);
+ }
+ activeQueues.clear();
+
+ LOG.debug("Flushed {} queue entries", entries);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled);
+ }
+}