From 95b103939343f1fc851c67899f707fffade70ef1 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 11 May 2015 21:18:29 +0200 Subject: [PATCH] BUG-3219: Introduce OutboundQueueHandler and related interfaces This patch introduces a barrier-enabled outbound queue. The queue works in reserve/commit manner and notifies users with the result of the requests. XID allocation occurs as an offset in a particular queue. Change-Id: Icd3ceda34746be0346ac59b8ed46352db3be9a2f Signed-off-by: Robert Varga (cherry picked from commit 0d3ddb408c6a37ff48f98c7e7d9dd00808b4c486) --- .../api/connection/ConnectionAdapter.java | 15 +- .../api/connection/OutboundQueue.java | 40 ++ .../api/connection/OutboundQueueHandler.java | 37 ++ .../OutboundQueueHandlerRegistration.java | 23 + .../connection/ConnectionAdapterImpl.java | 70 ++- .../core/connection/OutboundQueueEntry.java | 90 ++++ .../OutboundQueueHandlerRegistrationImpl.java | 18 + .../core/connection/OutboundQueueImpl.java | 239 ++++++++++ .../core/connection/OutboundQueueManager.java | 424 ++++++++++++++++++ 9 files changed, 934 insertions(+), 22 deletions(-) create mode 100644 openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java create mode 100644 openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandler.java create mode 100644 openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandlerRegistration.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueHandlerRegistrationImpl.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java index a205e5e6..02679f05 100644 --- a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java +++ b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java @@ -5,13 +5,11 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - - package org.opendaylight.openflowjava.protocol.api.connection; +import com.google.common.annotations.Beta; import java.net.InetSocketAddress; import java.util.concurrent.Future; - import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolService; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener; @@ -73,4 +71,15 @@ public interface ConnectionAdapter extends OpenflowProtocolService { * @return true, if channel is configured to autoread */ boolean isAutoRead(); + + /** + * Registers a new bypass outbound queue + * @param handler + * @param maxQueueDepth + * @param maxBarrierNanos + * @return An {@link OutboundQueueHandlerRegistration} + */ + @Beta + OutboundQueueHandlerRegistration registerOutboundQueueHandler(T handler, + int maxQueueDepth, long maxBarrierNanos); } diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java new file mode 100644 index 00000000..1291c22c --- /dev/null +++ b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.api.connection; + +import com.google.common.annotations.Beta; +import com.google.common.util.concurrent.FutureCallback; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; + +@Beta +public interface OutboundQueue { + /** + * Reserve an entry in the outbound queue. + * @return XID for the new message, or null if the queue is full + */ + Long reserveEntry(); + + /** + * Commit the specified offset using a message. Specified callback will + * be invoked once we know how it has resolved, either with a normal response, + * implied completion via a barrier, or failure (such as connection drop). For + * multipart responses, {@link FutureCallback#onSuccess(Object)} will be invoked + * multiple times as the corresponding responses arrive. If the request is completed + * with a response, the object reported will be non-null. If the request's completion + * is implied by a barrier, the object reported will be null. + * + * @param xid Previously-reserved XID + * @param message Message which should be sent out, or null if the reservation + * should be cancelled. + * @param callback Callback to be invoked, or null if no callback should be invoked. + * @throws IllegalArgumentException if the slot is already committed or was never reserved. + */ + void commitEntry(@Nonnull Long xid, @Nullable OfHeader message, @Nullable FutureCallback callback); +} diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandler.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandler.java new file mode 100644 index 00000000..08007985 --- /dev/null +++ b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.api.connection; + +import com.google.common.annotations.Beta; +import javax.annotation.Nonnull; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; + +/** + * Handler of the outbound queue. The queue has a maximum depth assigned when the + * handler is registered. + */ +@Beta +public interface OutboundQueueHandler { + /** + * Create a new {@link BarrierInput barrier} message. This callback is invoked + * when the queue is being flushed to the switch. The barrier ensures that any + * outstanding requests are detected as either completed or failed. + * + * @param xid XID for the barrier message + * @return New barrier message. + */ + @Nonnull BarrierInput createBarrierRequest(@Nonnull Long xid); + + /** + * Invoked whenever the underlying queue is refreshed. Implementations should + * ensure they are talking to the latest queue + * @param queue New queue instance, null indicates a shutdown, e.g. the queue + * is no longer available. + */ + void onConnectionQueueChanged(OutboundQueue queue); +} diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandlerRegistration.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandlerRegistration.java new file mode 100644 index 00000000..d31d1abb --- /dev/null +++ b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandlerRegistration.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.api.connection; + +import com.google.common.annotations.Beta; +import org.opendaylight.yangtools.concepts.ObjectRegistration; + +/** + * An {@link ObjectRegistration} of a {@link OutboundQueueHandler}. Registration can be cancelled + * by invoking {@link #close()}. + * + * @param Handler type + */ +@Beta +public interface OutboundQueueHandlerRegistration extends ObjectRegistration { + @Override + void close(); +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java index a38e1700..20e4c2a8 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java @@ -9,16 +9,24 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; - import java.net.InetSocketAddress; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; - import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowjava.statistics.CounterEventTypes; import org.opendaylight.openflowjava.statistics.StatisticsCounters; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; @@ -66,15 +74,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalCause; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; - /** * Handles messages (notifications + rpcs) and connections * @author mirehak @@ -115,8 +114,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade { private ConnectionReadyListener connectionReadyListener; private OpenflowProtocolListener messageListener; private SystemNotificationsListener systemListener; + private OutboundQueueManager outputManager; private boolean disconnectOccured = false; - private StatisticsCounters statisticsCounters; + private final StatisticsCounters statisticsCounters; + private final InetSocketAddress address; /** * default ctor @@ -131,6 +132,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { .removalListener(REMOVAL_LISTENER).build(); this.channel = Preconditions.checkNotNull(channel); this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address); + this.address = address; channel.pipeline().addLast(output); statisticsCounters = StatisticsCounters.getInstance(); LOG.debug("ConnectionAdapter created"); @@ -285,6 +287,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof ExperimenterMessage) { messageListener.onExperimenterMessage((ExperimenterMessage) message); + if (outputManager != null) { + outputManager.onMessage((OfHeader) message); + } statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof FlowRemovedMessage) { messageListener.onFlowRemovedMessage((FlowRemovedMessage) message); @@ -295,6 +300,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof MultipartReplyMessage) { messageListener.onMultipartReplyMessage((MultipartReplyMessage) message); + if (outputManager != null) { + outputManager.onMessage((OfHeader) message); + } statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof PacketInMessage) { messageListener.onPacketInMessage((PacketInMessage) message); @@ -305,9 +313,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } else { LOG.warn("message listening not supported for type: {}", message.getClass()); } - } else { - if (message instanceof OfHeader) { - LOG.debug("OFheader msg received"); + } else if (message instanceof OfHeader) { + LOG.debug("OFheader msg received"); + + if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { RpcResponseKey key = createRpcResponseKey((OfHeader) message); final ResponseExpectedRpcListener listener = findRpcResponse(key); if (listener != null) { @@ -319,9 +328,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } else { LOG.warn("received unexpected rpc response: {}", key); } - } else { - LOG.warn("message listening not supported for type: {}", message.getClass()); } + } else { + LOG.warn("message listening not supported for type: {}", message.getClass()); } } @@ -466,7 +475,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * Used only for testing purposes * @param cache */ - public void setResponseCache(Cache> cache) { + public void setResponseCache(final Cache> cache) { this.responseCache = cache; } @@ -476,7 +485,30 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } @Override - public void setAutoRead(boolean autoRead) { + public void setAutoRead(final boolean autoRead) { channel.config().setAutoRead(autoRead); } + + @Override + public OutboundQueueHandlerRegistration registerOutboundQueueHandler( + final T handler, final int maxQueueDepth, final long maxBarrierNanos) { + Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager); + + final OutboundQueueManager ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos); + outputManager = ret; + channel.pipeline().addLast(outputManager); + + return new OutboundQueueHandlerRegistrationImpl(handler) { + @Override + protected void removeRegistration() { + outputManager.close(); + channel.pipeline().remove(outputManager); + outputManager = null; + } + }; + } + + Channel getChannel() { + return channel; + } } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java new file mode 100644 index 00000000..c0c2f764 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class OutboundQueueEntry { + private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class); + private FutureCallback callback; + private OfHeader message; + private boolean completed; + private volatile boolean committed; + + void commit(final OfHeader message, final FutureCallback callback) { + this.message = message; + this.callback = callback; + + // Volatile write, needs to be last + committed = true; + } + + void reset() { + callback = null; + message = null; + completed = false; + + // Volatile write, needs to be last + committed = false; + } + + boolean isBarrier() { + return message instanceof BarrierInput; + } + + boolean isCommitted() { + return committed; + } + + boolean isCompleted() { + return completed; + } + + OfHeader getMessage() { + return message; + } + + boolean complete(final OfHeader response) { + Preconditions.checkState(!completed, "Attempted to complete a completed message %s with response %s", message, response); + + // Multipart requests are special, we have to look at them to see + // if there is something outstanding and adjust ourselves accordingly + final boolean reallyComplete; + if (response instanceof MultipartReplyMessage) { + reallyComplete = !((MultipartReplyMessage) response).getFlags().isOFPMPFREQMORE(); + LOG.debug("Multipart reply {}", response); + } else { + reallyComplete = true; + } + + completed = reallyComplete; + if (callback != null) { + callback.onSuccess(response); + } + LOG.debug("Entry {} completed {} with response {}", this, completed, response); + return reallyComplete; + } + + void fail(final Throwable cause) { + if (!completed) { + completed = true; + if (callback != null) { + callback.onFailure(cause); + } + } else { + LOG.warn("Ignoring failure {} for completed message {}", cause, message); + } + } + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueHandlerRegistrationImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueHandlerRegistrationImpl.java new file mode 100644 index 00000000..697d2ddd --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueHandlerRegistrationImpl.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; + +abstract class OutboundQueueHandlerRegistrationImpl extends AbstractObjectRegistration implements OutboundQueueHandlerRegistration { + protected OutboundQueueHandlerRegistrationImpl(final T instance) { + super(instance); + } +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java new file mode 100644 index 00000000..1bb9ec4b --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.LockSupport; +import javax.annotation.Nonnull; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class OutboundQueueImpl implements OutboundQueue { + private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class); + private static final AtomicIntegerFieldUpdater CURRENT_OFFSET_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset"); + private static final long FLUSH_RETRY_NANOS = 1L; + private final OutboundQueueManager manager; + private final OutboundQueueEntry[] queue; + private final long baseXid; + private final long endXid; + private final int reserve; + + // Updated concurrently + private volatile int reserveOffset; + + // Updated from Netty only + private int flushOffset; + private int completeCount; + + OutboundQueueImpl(final OutboundQueueManager manager, final long baseXid, final int maxQueue) { + /* + * We use the last entry as an emergency should a timeout-triggered + * flush request race with normal users for the last entry in this + * queue. In that case the flush request will take the last entry and + * schedule a flush, which means that we will get around sending the + * message as soon as the user finishes the reservation. + */ + Preconditions.checkArgument(maxQueue > 1); + this.baseXid = baseXid; + this.endXid = baseXid + maxQueue; + this.reserve = maxQueue - 1; + this.manager = Preconditions.checkNotNull(manager); + queue = new OutboundQueueEntry[maxQueue]; + for (int i = 0; i < maxQueue; ++i) { + queue[i] = new OutboundQueueEntry(); + } + } + + private OutboundQueueImpl(final OutboundQueueManager manager, final long baseXid, final OutboundQueueEntry[] queue) { + this.manager = Preconditions.checkNotNull(manager); + this.queue = Preconditions.checkNotNull(queue); + this.baseXid = baseXid; + this.endXid = baseXid + queue.length; + this.reserve = queue.length - 1; + for (OutboundQueueEntry element : queue) { + element.reset(); + } + } + + OutboundQueueImpl reuse(final long baseXid) { + return new OutboundQueueImpl(manager, baseXid, queue); + } + + Long reserveEntry(final boolean forBarrier) { + final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this); + if (offset >= reserve) { + if (forBarrier) { + LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length); + return endXid; + } else { + LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length); + return null; + } + } + + final Long xid = baseXid + offset; + LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset); + return xid; + } + + @Override + public Long reserveEntry() { + return reserveEntry(false); + } + + @Override + public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback) { + final int offset = (int)(xid - baseXid); + if (message != null) { + Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid); + } + + queue[offset].commit(message, callback); + LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset); + + manager.ensureFlushing(this); + } + + /** + * An empty queue is a queue which has no further unflushed entries. + * + * @return True if this queue does not have unprocessed entries. + */ + boolean isEmpty() { + int ro = reserveOffset; + if (ro >= reserve) { + if (queue[reserve].isCommitted()) { + ro = reserve + 1; + } else { + ro = reserve; + } + } + + LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro); + return ro <= flushOffset; + } + + /** + * A queue is finished when all of its entries have been completed. + * + * @return False if there are any uncompleted requests. + */ + boolean isFinished() { + if (completeCount < reserve) { + return false; + } + + // We need to check if the last entry was used + final OutboundQueueEntry last = queue[reserve]; + return !last.isCommitted() || last.isCompleted(); + } + + boolean isFlushed() { + LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve); + if (flushOffset < reserve) { + return false; + } + + // flushOffset implied == reserve + return flushOffset >= queue.length || !queue[reserve].isCommitted(); + } + + OfHeader flushEntry() { + for (;;) { + // No message ready + if (isEmpty()) { + LOG.debug("Flush offset {} is uptodate with reserved", flushOffset); + return null; + } + + boolean retry = true; + while (!queue[flushOffset].isCommitted()) { + if (!retry) { + LOG.debug("Offset {} not ready yet, giving up", flushOffset); + return null; + } + + LOG.debug("Offset {} not ready yet, retrying", flushOffset); + LockSupport.parkNanos(FLUSH_RETRY_NANOS); + retry = false; + } + + final OfHeader msg = queue[flushOffset++].getMessage(); + if (msg != null) { + return msg; + } + } + } + + private boolean xidInRance(final long xid) { + return xid < endXid && (xid >= baseXid || baseXid > endXid); + } + + /** + * Return the request entry corresponding to a response. Returns null + * if there is no request matching the response. + * + * @param response Response message + * @return Matching request entry, or null if no match is found. + */ + OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) { + final Long xid = response.getXid(); + if (!xidInRance(xid)) { + LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid); + return null; + } + + final int offset = (int)(xid - baseXid); + final OutboundQueueEntry entry = queue[offset]; + if (entry.isCompleted()) { + LOG.debug("Entry {} already is completed, not accepting response {}", entry, response); + return null; + } + + if (entry.complete(response)) { + completeCount++; + + // This has been a barrier -- make sure we complete all preceding requests + if (entry.isBarrier()) { + LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1); + for (int i = 0; i < offset; ++i) { + final OutboundQueueEntry e = queue[i]; + if (!e.isCompleted() && e.complete(null)) { + completeCount++; + } + } + } + } + return entry; + } + + void completeAll() { + for (OutboundQueueEntry entry : queue) { + if (!entry.isCompleted() && entry.complete(null)) { + completeCount++; + } + } + } + + int failAll(final Throwable cause) { + int ret = 0; + for (OutboundQueueEntry entry : queue) { + if (!entry.isCompleted()) { + entry.fail(cause); + ret++; + } + } + + return ret; + } +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java new file mode 100644 index 00000000..b7c2c8f9 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -0,0 +1,424 @@ +/* + * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import java.net.InetSocketAddress; +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class OutboundQueueManager extends ChannelInboundHandlerAdapter implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class); + + /** + * This is the default upper bound we place on the flush task running + * a single iteration. We relinquish control after about this amount + * of time. + */ + private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100); + + /** + * We re-check the time spent flushing every this many messages. We do this because + * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE + * or similar to disable the feature. + */ + private static final int WORKTIME_RECHECK_MSGS = 64; + + /** + * We maintain a cache of this many previous queues for later reuse. + */ + private static final int QUEUE_CACHE_SIZE = 4; + + private final Queue queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE); + private final Queue activeQueues = new LinkedList<>(); + private final ConnectionAdapterImpl parent; + private final InetSocketAddress address; + private final long maxBarrierNanos; + private final long maxWorkTime; + private final int queueSize; + private final T handler; + + /* + * Instead of using an AtomicBoolean object, we use these two. It saves us + * from allocating an extra object. + */ + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater FLUSH_SCHEDULED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled"); + private volatile int flushScheduled = 0; + + // Updated from netty only + private long lastBarrierNanos = System.nanoTime(); + private OutboundQueueImpl currentQueue; + private int nonBarrierMessages; + private long lastXid = 0; + + // Passed to executor to request triggering of flush + private final Runnable flushRunnable = new Runnable() { + @Override + public void run() { + flush(); + } + }; + private final Runnable barrierRunnable = new Runnable() { + @Override + public void run() { + barrier(); + } + }; + + OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler, + final int queueSize, final long maxBarrierNanos) { + this.parent = Preconditions.checkNotNull(parent); + this.handler = Preconditions.checkNotNull(handler); + Preconditions.checkArgument(queueSize > 0); + this.queueSize = queueSize; + Preconditions.checkArgument(maxBarrierNanos > 0); + this.maxBarrierNanos = maxBarrierNanos; + this.address = address; + this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS); + + LOG.debug("Queue manager instantiated with queue size {}", queueSize); + createQueue(); + scheduleBarrierTimer(lastBarrierNanos); + } + + T getHandler() { + return handler; + } + + @Override + public void close() { + handler.onConnectionQueueChanged(null); + } + + private void retireQueue(final OutboundQueueImpl queue) { + if (queueCache.offer(queue)) { + LOG.debug("Saving queue {} for later reuse", queue); + } else { + LOG.debug("Queue {} thrown away", queue); + } + } + + private void createQueue() { + final long baseXid = lastXid; + lastXid += queueSize + 1; + + final OutboundQueueImpl cached = queueCache.poll(); + final OutboundQueueImpl queue; + if (cached != null) { + queue = cached.reuse(baseXid); + LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel()); + } else { + queue = new OutboundQueueImpl(this, baseXid, queueSize + 1); + LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel()); + } + + activeQueues.add(queue); + currentQueue = queue; + handler.onConnectionQueueChanged(queue); + } + + private void scheduleBarrierTimer(final long now) { + long next = lastBarrierNanos + maxBarrierNanos; + if (next < now) { + LOG.debug("Attempted to schedule barrier in the past, reset maximum)"); + next = now + maxBarrierNanos; + } + + final long delay = next - now; + LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay)); + parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS); + } + + private void scheduleBarrierMessage() { + final Long xid = currentQueue.reserveEntry(true); + Verify.verifyNotNull(xid); + + currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null); + LOG.debug("Barrier XID {} scheduled", xid); + + // We can see into the future when compared to flushEntry(), as that + // codepath may be lagging behind on messages. Resetting the counter + // here ensures that flushEntry() will not attempt to issue a flush + // request. Note that we do not reset current time, as that should + // reflect when we sent the message for real. + nonBarrierMessages = 0; + } + + /** + * Flush an entry from the queue. + * + * @param now Time reference for 'now'. We take this as an argument, as + * we need a timestamp to mark barrier messages we see swinging + * by. That timestamp does not need to be completely accurate, + * hence we use the flush start time. Alternative would be to + * measure System.nanoTime() for each barrier -- needlessly + * adding overhead. + * + * @return Entry which was flushed, null if no entry is ready. + */ + OfHeader flushEntry(final long now) { + final OfHeader message = currentQueue.flushEntry(); + if (currentQueue.isFlushed()) { + LOG.debug("Queue {} is fully flushed", currentQueue); + createQueue(); + } + + if (message == null) { + return null; + } + + if (message instanceof BarrierInput) { + LOG.debug("Barrier message seen, resetting counters"); + nonBarrierMessages = 0; + lastBarrierNanos = now; + } else { + nonBarrierMessages++; + if (nonBarrierMessages >= queueSize) { + LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages); + scheduleBarrierMessage(); + } + } + + return message; + } + + /** + * Invoked whenever a message comes in from the switch. Runs matching + * on all active queues in an attempt to complete a previous request. + * + * @param message Potential response message + * @return True if the message matched a previous request, false otherwise. + */ + boolean onMessage(final OfHeader message) { + LOG.debug("Attempting to pair message {} to a request", message); + + Iterator it = activeQueues.iterator(); + while (it.hasNext()) { + final OutboundQueueImpl queue = it.next(); + final OutboundQueueEntry entry = queue.pairRequest(message); + + if (entry == null) { + continue; + } + + LOG.debug("Queue {} accepted response {}", queue, message); + + // This has been a barrier request, we need to flush all + // previous queues + if (entry.isBarrier() && activeQueues.size() > 1) { + LOG.debug("Queue {} indicated request was a barrier", queue); + + it = activeQueues.iterator(); + while (it.hasNext()) { + final OutboundQueueImpl q = it.next(); + + // We want to complete all queues before the current one, we will + // complete the current queue below + if (!queue.equals(q)) { + LOG.debug("Queue {} is implied finished", q); + q.completeAll(); + it.remove(); + retireQueue(q); + } else { + break; + } + } + } + + if (queue.isFinished()) { + LOG.debug("Queue {} is finished", queue); + it.remove(); + retireQueue(queue); + } + + return true; + } + + LOG.debug("Failed to find completion for message {}", message); + return false; + } + + private void scheduleFlush() { + if (parent.getChannel().isWritable()) { + if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) { + LOG.trace("Scheduling flush task on channel {}", parent.getChannel()); + parent.getChannel().eventLoop().execute(flushRunnable); + } else { + LOG.trace("Flush task is already present on channel {}", parent.getChannel()); + } + } else { + LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel()); + } + } + + void ensureFlushing(final OutboundQueueImpl queue) { + Preconditions.checkState(currentQueue.equals(queue)); + scheduleFlush(); + } + + /** + * Periodic barrier check. + */ + protected void barrier() { + LOG.debug("Channel {} barrier timer expired", parent.getChannel()); + if (currentQueue == null) { + LOG.debug("Channel shut down, not processing barrier"); + return; + } + + final long now = System.nanoTime(); + final long sinceLast = now - lastBarrierNanos; + if (sinceLast >= maxBarrierNanos) { + LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast); + // FIXME: we should be tracking requests/responses instead of this + if (nonBarrierMessages == 0) { + LOG.debug("No messages written since last barrier, not issuing one"); + } else { + scheduleBarrierMessage(); + } + } + + scheduleBarrierTimer(now); + } + + /** + * Perform a single flush operation. + */ + protected void flush() { + final long start = System.nanoTime(); + final long deadline = start + maxWorkTime; + + LOG.debug("Dequeuing messages to channel {}", parent.getChannel()); + + long messages = 0; + for (;; ++messages) { + if (!parent.getChannel().isWritable()) { + LOG.trace("Channel is no longer writable"); + break; + } + + final OfHeader message = flushEntry(start); + if (message == null) { + LOG.trace("The queue is completely drained"); + break; + } + + final Object wrapper; + if (address == null) { + wrapper = new MessageListenerWrapper(message, null); + } else { + wrapper = new UdpMessageListenerWrapper(message, null, address); + } + parent.getChannel().write(wrapper); + + /* + * Check every WORKTIME_RECHECK_MSGS for exceeded time. + * + * XXX: given we already measure our flushing throughput, we + * should be able to perform dynamic adjustments here. + * is that additional complexity needed, though? + */ + if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) { + LOG.trace("Exceeded allotted work time {}us", + TimeUnit.NANOSECONDS.toMicros(maxWorkTime)); + break; + } + } + + if (messages > 0) { + LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel()); + parent.getChannel().flush(); + } + + final long stop = System.nanoTime(); + LOG.debug("Flushed {} messages in {}us to channel {}", + messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel()); + + /* + * We are almost ready to terminate. This is a bit tricky, because + * we do not want to have a race window where a message would be + * stuck on the queue without a flush being scheduled. + * + * So we mark ourselves as not running and then re-check if a + * flush out is needed. That will re-synchronized with other threads + * such that only one flush is scheduled at any given time. + */ + if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) { + LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this); + } + + conditionalFlush(); + } + + + /** + * Schedule a queue flush if it is not empty and the channel is found + * to be writable. May only be called from Netty context. + */ + private void conditionalFlush() { + if (!currentQueue.isEmpty()) { + scheduleFlush(); + } else { + LOG.trace("Queue is empty, no flush needed"); + } + } + + private void conditionalFlush(final ChannelHandlerContext ctx) { + Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx); + conditionalFlush(); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + conditionalFlush(ctx); + } + + @Override + public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { + super.channelWritabilityChanged(ctx); + conditionalFlush(ctx); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + + long entries = 0; + LOG.debug("Channel shutdown, flushing queue..."); + handler.onConnectionQueueChanged(null); + + final Throwable cause = new RejectedExecutionException("Channel disconnected"); + for (OutboundQueueImpl queue : activeQueues) { + entries += queue.failAll(cause); + } + activeQueues.clear(); + + LOG.debug("Flushed {} queue entries", entries); + } + + @Override + public String toString() { + return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled); + } +} -- 2.36.6