import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-abstract class AbstractRpcListener<T> implements GenericFutureListener<Future<? super Void>> {
+/**
+ * This class holds all the context we need for sending a single message down the tube.
+ * A MessageHolder (used in queue) and the actual listener. It is not a thing of beauty,
+ * but it keeps us from allocating unnecessary objects in the egress path.
+ */
+abstract class AbstractRpcListener<T> implements GenericFutureListener<Future<Void>>, ChannelOutboundQueue.MessageHolder<Object> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRpcListener.class);
private final SettableFuture<RpcResult<T>> result = SettableFuture.create();
private final String failureInfo;
+ private Object message;
- AbstractRpcListener(final String failureInfo) {
- this.failureInfo = failureInfo;
+ AbstractRpcListener(final Object message, final String failureInfo) {
+ this.failureInfo = Preconditions.checkNotNull(failureInfo);
+ this.message = Preconditions.checkNotNull(message);
}
public final ListenableFuture<RpcResult<T>> getResult() {
}
@Override
- public final void operationComplete(final Future<? super Void> future) {
+ public final void operationComplete(final Future<Void> future) {
if (!future.isSuccess()) {
LOG.debug("operation failed");
failedRpc(future.cause());
}
}
+ @Override
+ public final Object takeMessage() {
+ final Object ret = message;
+ Preconditions.checkState(ret != null, "Message has already been taken");
+ message = null;
+ return ret;
+ }
+
+ @Override
+ public final GenericFutureListener<Future<Void>> takeListener() {
+ return this;
+ }
+
abstract protected void operationSuccessful();
protected final void failedRpc(final Throwable cause) {
final RpcError rpcError = ConnectionAdapterImpl.buildRpcError(
failureInfo, ErrorSeverity.ERROR, "check switch connection", cause);
-
result.set(Rpcs.getRpcResult(
false,
(T)null,
}
protected final void successfulRpc(final T value) {
-
result.set(Rpcs.getRpcResult(
true,
value,
Collections.<RpcError>emptyList()));
}
-
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowjava.protocol.impl.connection;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+
+/**
+ * Channel handler which bypasses wraps on top of normal Netty pipeline, allowing
+ * writes to be enqueued from any thread, it then schedules a task pipeline task,
+ * which shuffles messages from the queue into the pipeline.
+ *
+ * Note this is an *Inbound* handler, as it reacts to channel writability changing,
+ * which in the Netty vocabulary is an inbound event. This has already changed in
+ * the Netty 5.0.0 API, where Handlers are unified.
+ */
+final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
+ public interface MessageHolder<T> {
+ /**
+ * Take ownership of the encapsulated listener. Guaranteed to
+ * be called at most once.
+ *
+ * @return listener encapsulated in the holder, may be null
+ * @throws IllegalStateException if the listener is no longer
+ * available (for example because it has already been
+ * taken).
+ */
+ GenericFutureListener<Future<Void>> takeListener();
+
+ /**
+ * Take ownership of the encapsulated message. Guaranteed to be
+ * called at most once.
+ *
+ * @return message encapsulated in the holder, may not be null
+ * @throws IllegalStateException if the message is no longer
+ * available (for example because it has already been
+ * taken).
+ */
+ T takeMessage();
+ }
+
+ /**
+ * This is the default upper bound we place on the flush task running
+ * a single iteration. We relinquish control after about this amount
+ * of time.
+ */
+ private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
+
+ /**
+ * We re-check the time spent flushing every this many messages. We do this because
+ * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
+ * or similar to disable the feature.
+ */
+ private static final int WORKTIME_RECHECK_MSGS = 64;
+ private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
+
+ // Passed to executor to request triggering of flush
+ private final Runnable flushRunnable = new Runnable() {
+ @Override
+ public void run() {
+ ChannelOutboundQueue.this.flush();
+ }
+ };
+
+ /*
+ * Instead of using an AtomicBoolean object, we use these two. It saves us
+ * from allocating an extra object.
+ */
+ private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");
+ private volatile int flushScheduled = 0;
+
+ private final Queue<MessageHolder<?>> queue;
+ private final long maxWorkTime;
+ private final Channel channel;
+
+ public ChannelOutboundQueue(final Channel channel, final int queueDepth) {
+ Preconditions.checkArgument(queueDepth > 0, "Queue depth has to be positive");
+
+ /*
+ * This looks like a good trade-off for throughput. Alternative is
+ * to use an ArrayBlockingQueue -- but that uses a single lock to
+ * synchronize both producers and consumers, potentially leading
+ * to less throughput.
+ */
+ this.queue = new LinkedBlockingQueue<>(queueDepth);
+ this.channel = Preconditions.checkNotNull(channel);
+ this.maxWorkTime = DEFAULT_WORKTIME_MICROS;
+ }
+
+ /**
+ * Enqueue a message holder for transmission. Is a thread-safe entry point
+ * for the channel. If the cannot be placed on the queue, this
+ *
+ * @param holder MessageHolder which should be enqueue
+ * @return Success indicator, true if the enqueue operation succeeded,
+ * false if the queue is full.
+ */
+ public boolean enqueue(final MessageHolder<?> holder) {
+ LOG.trace("Enqueuing message {}", holder);
+ if (queue.offer(holder)) {
+ LOG.trace("Message enqueued");
+ conditionalFlush();
+ return true;
+ }
+
+ LOG.trace("Message queue is full");
+ return false;
+ }
+
+ private void scheduleFlush(final EventExecutor executor) {
+ if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
+ LOG.trace("Scheduling flush task");
+ executor.execute(flushRunnable);
+ } else {
+ LOG.trace("Flush task is already present");
+ }
+ }
+
+ /**
+ * Schedule a queue flush if it is not empty and the channel is found
+ * to be writable.
+ */
+ private void conditionalFlush() {
+ if (queue.isEmpty()) {
+ LOG.trace("Queue is empty, not flush needed");
+ return;
+ }
+ if (!channel.isWritable()) {
+ LOG.trace("Channel {} is not writable, not issuing a flush", channel);
+ return;
+ }
+
+ scheduleFlush(channel.pipeline().lastContext().executor());
+ }
+
+ /*
+ * The synchronized keyword should be unnecessary, really, but it enforces
+ * queue order should something go terribly wrong. It should be completely
+ * uncontended.
+ */
+ private synchronized void flush() {
+ final Stopwatch w = new Stopwatch().start();
+
+ LOG.debug("Dequeuing messages to channel {}", channel);
+
+ long messages = 0;
+ for (;; ++messages) {
+ if (!channel.isWritable()) {
+ LOG.trace("Channel is no longer writable");
+ break;
+ }
+
+ final MessageHolder<?> h = queue.poll();
+ if (h == null) {
+ LOG.trace("The queue is completely drained");
+ break;
+ }
+
+ final ChannelFuture p = channel.write(h.takeMessage());
+ final GenericFutureListener<Future<Void>> l = h.takeListener();
+ if (l != null) {
+ p.addListener(l);
+ }
+
+ /*
+ * Check every WORKTIME_RECHECK_MSGS for exceeded time.
+ *
+ * XXX: given we already measure our flushing throughput, we
+ * should be able to perform dynamic adjustments here.
+ * is that additional complexity needed, though?
+ */
+ if ((messages % WORKTIME_RECHECK_MSGS) == 0 &&
+ w.elapsed(TimeUnit.MICROSECONDS) >= maxWorkTime) {
+ LOG.trace("Exceeded allotted work time {}us", maxWorkTime);
+ break;
+ }
+ }
+
+ if (messages > 0) {
+ LOG.debug("Flushing {} message(s) to channel {}", messages, channel);
+ channel.flush();
+ }
+
+ w.stop();
+ LOG.debug("Flushed {} messages in {}us to channel {}",
+ messages, w.elapsed(TimeUnit.MICROSECONDS), channel);
+
+ /*
+ * We are almost ready to terminate. This is a bit tricky, because
+ * we do not want to have a race window where a message would be
+ * stuck on the queue without a flush being scheduled.
+ *
+ * So we mark ourselves as not running and then re-check if a
+ * flush out is needed. That will re-synchronized with other threads
+ * such that only one flush is scheduled at any given time.
+ */
+ if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
+ LOG.warn("Channel {} queue {} flusher found unscheduled", channel, queue);
+ }
+
+ conditionalFlush();
+ }
+
+ private void conditionalFlush(final ChannelHandlerContext ctx) {
+ Preconditions.checkState(ctx.channel() == channel, "Inconsistent channel %s with context %s", channel, ctx);
+ conditionalFlush();
+ }
+
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+ super.channelActive(ctx);
+ conditionalFlush(ctx);
+ }
+
+ @Override
+ public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
+ super.channelWritabilityChanged(ctx);
+ conditionalFlush(ctx);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);
+ }
+}
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.sal.common.util.RpcErrors;
/** after this time, RPC future response objects will be thrown away (in minutes) */
public static final int RPC_RESPONSE_EXPIRATION = 1;
+ /**
+ * Default depth of write queue, e.g. we allow these many messages
+ * to be queued up before blocking producers.
+ */
+ public static final int DEFAULT_QUEUE_DEPTH = 1024;
+
private static final Logger LOG = LoggerFactory
.getLogger(ConnectionAdapterImpl.class);
+ private static final Exception QUEUE_FULL_EXCEPTION =
+ new RejectedExecutionException("Output queue is full");
private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY";
private static final String TAG = "OPENFLOW";
new RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>>() {
@Override
public void onRemoval(
- RemovalNotification<RpcResponseKey, ResponseExpectedRpcListener<?>> notification) {
+ final RemovalNotification<RpcResponseKey, ResponseExpectedRpcListener<?>> notification) {
notification.getValue().discard();
}
};
/** expiring cache for future rpcResponses */
private final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> responseCache;
+ private final ChannelOutboundQueue output;
private final Channel channel;
private ConnectionReadyListener connectionReadyListener;
.expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
.removalListener(REMOVAL_LISTENER).build();
this.channel = Preconditions.checkNotNull(channel);
+ this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH);
+ channel.pipeline().addLast(output);
LOG.debug("ConnectionAdapter created");
}
}
}
+ private <T> ListenableFuture<RpcResult<T>> enqueueMessage(final AbstractRpcListener<T> promise) {
+ LOG.debug("Submitting promise {}", promise);
+
+ if (!output.enqueue(promise)) {
+ LOG.debug("Message queue is full, rejecting execution");
+ promise.failedRpc(QUEUE_FULL_EXCEPTION);
+ } else {
+ LOG.debug("Promise enqueued successfully");
+ }
+
+ return promise.getResult();
+ }
+
/**
* sends given message to switch, sending result will be reported via return value
* @param input message to send
*/
private ListenableFuture<RpcResult<Void>> sendToSwitchFuture(
final DataObject input, final String failureInfo) {
- final SimpleRpcListener listener = new SimpleRpcListener(failureInfo);
-
- LOG.debug("going to flush");
- channel.writeAndFlush(input).addListener(listener);
- LOG.debug("flushed");
-
- return listener.getResult();
+ return enqueueMessage(new SimpleRpcListener(input, failureInfo));
}
/**
final IN input, final Class<OUT> responseClazz, final String failureInfo) {
final RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz.getName());
final ResponseExpectedRpcListener<OUT> listener =
- new ResponseExpectedRpcListener<>(failureInfo, responseCache, key);
-
- LOG.debug("going to flush");
- channel.writeAndFlush(input).addListener(listener);
- LOG.debug("flushed");
-
- return listener.getResult();
+ new ResponseExpectedRpcListener<>(input, failureInfo, responseCache, key);
+ return enqueueMessage(listener);
}
/**
private final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache;
private final RpcResponseKey key;
- ResponseExpectedRpcListener(final String failureInfo,
+ ResponseExpectedRpcListener(final Object message, final String failureInfo,
final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache, final RpcResponseKey key) {
- super(failureInfo);
+ super(message, failureInfo);
this.cache = Preconditions.checkNotNull(cache);
this.key = Preconditions.checkNotNull(key);
}
package org.opendaylight.openflowjava.protocol.impl.connection;
final class SimpleRpcListener extends AbstractRpcListener<Void> {
- public SimpleRpcListener(final String failureInfo) {
- super(failureInfo);
+ public SimpleRpcListener(final Object message, final String failureInfo) {
+ super(message, failureInfo);
}
@Override
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowjava.protocol.impl.connection.ServerFacade;
-import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
+import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @author michal.polkorab
*/
public class TcpHandler implements ServerFacade {
+ /*
+ * High/low write watermarks, in KiB.
+ */
+ private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64;
+ private static final int DEFAULT_WRITE_LOW_WATERMARK = 32;
+ /*
+ * Write spin count. This tells netty to immediately retry a non-blocking
+ * write this many times before moving on to selecting.
+ */
+ private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TcpHandler.class);
private int port;
private String address;
- private InetAddress startupAddress;
+ private final InetAddress startupAddress;
private NioEventLoopGroup workerGroup;
private NioEventLoopGroup bossGroup;
- private static final Logger LOGGER = LoggerFactory.getLogger(TcpHandler.class);
- private SettableFuture<Boolean> isOnlineFuture;
-
-
- private PublishingChannelInitializer channelInitializer;
+ private final SettableFuture<Boolean> isOnlineFuture;
+
+ private final PublishingChannelInitializer channelInitializer;
/**
* Enum used for storing names of used components (in pipeline).
*/
DELEGATING_INBOUND_HANDLER,
}
-
+
/**
* Constructor of TCPHandler that listens on selected port.
*
* @param port listening port of TCPHandler server
*/
- public TcpHandler(int port) {
+ public TcpHandler(final int port) {
this(null, port);
}
-
+
/**
* Constructor of TCPHandler that listens on selected address and port.
* @param address listening address of TCPHandler server
* @param port listening port of TCPHandler server
*/
- public TcpHandler(InetAddress address, int port) {
+ public TcpHandler(final InetAddress address, final int port) {
this.port = port;
this.startupAddress = address;
channelInitializer = new PublishingChannelInitializer();
public void run() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
+
+ /*
+ * We generally do not perform IO-unrelated tasks, so we want to have
+ * all outstanding tasks completed before the executing thread goes
+ * back into select.
+ *
+ * Any other setting means netty will measure the time it spent selecting
+ * and spend roughly proportional time executing tasks.
+ */
+ workerGroup.setIoRatio(100);
+
+ final ChannelFuture f;
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.childHandler(channelInitializer)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATERMARK * 1024)
+ .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATERMARK * 1024)
+ .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
- ChannelFuture f;
if (startupAddress != null) {
f = b.bind(startupAddress.getHostAddress(), port).sync();
} else {
f = b.bind(port).sync();
}
-
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while binding port {}", port, e);
+ return;
+ }
+
+ try {
InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
address = isa.getHostString();
- LOGGER.debug("address from tcphandler: " + address);
- port = isa.getPort();
+
+ // Update port, as it may have been specified as 0
+ this.port = isa.getPort();
+
+ LOGGER.debug("address from tcphandler: {}", address);
isOnlineFuture.set(true);
- LOGGER.info("Switch listener started and ready to accept incoming connections on port: " + port);
+ LOGGER.info("Switch listener started and ready to accept incoming connections on port: {}", port);
f.channel().closeFuture().sync();
- } catch (InterruptedException ex) {
- LOGGER.error(ex.getMessage(), ex);
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while waiting for port {} shutdown", port, e);
} finally {
shutdown();
}
@Override
public void operationComplete(
- io.netty.util.concurrent.Future<Object> downResult) throws Exception {
+ final io.netty.util.concurrent.Future<Object> downResult) throws Exception {
result.set(downResult.isSuccess());
if (downResult.cause() != null) {
result.setException(downResult.cause());
}
}
-
+
});
return result;
}
-
+
/**
- *
+ *
* @return number of connected clients / channels
*/
public int getNumberOfConnections() {
return channelInitializer.size();
}
-
+
/**
* @return channelInitializer providing channels
*/
public PublishingChannelInitializer getChannelInitializer() {
return channelInitializer;
}
-
+
@Override
public ListenableFuture<Boolean> getIsOnlineFuture() {
return isOnlineFuture;
}
-
+
/**
* @return the port
*/
public int getPort() {
return port;
}
-
+
/**
* @return the address
*/
* @param switchConnectionHandler
*/
public void setSwitchConnectionHandler(
- SwitchConnectionHandler switchConnectionHandler) {
+ final SwitchConnectionHandler switchConnectionHandler) {
channelInitializer.setSwitchConnectionHandler(switchConnectionHandler);
}
-
+
/**
* @param switchIdleTimeout in milliseconds
*/
- public void setSwitchIdleTimeout(long switchIdleTimeout) {
+ public void setSwitchIdleTimeout(final long switchIdleTimeout) {
channelInitializer.setSwitchIdleTimeout(switchIdleTimeout);
}
/**
* @param tlsSupported
*/
- public void setEncryption(boolean tlsSupported) {
+ public void setEncryption(final boolean tlsSupported) {
channelInitializer.setEncryption(tlsSupported);
}
/**
* @param sf serialization factory
*/
- public void setSerializationFactory(SerializationFactory sf) {
+ public void setSerializationFactory(final SerializationFactory sf) {
channelInitializer.setSerializationFactory(sf);
}
/**
* @param factory
*/
- public void setDeserializationFactory(DeserializationFactory factory) {
+ public void setDeserializationFactory(final DeserializationFactory factory) {
channelInitializer.setDeserializationFactory(factory);
}