Bug 2245 - Fixed Avoid cycle between java packages
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ChannelOutboundQueue.java
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ChannelOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ChannelOutboundQueue.java
new file mode 100644 (file)
index 0000000..47d694d
--- /dev/null
@@ -0,0 +1,278 @@
+/*
+ * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.net.InetSocketAddress;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Channel handler which bypasses wraps on top of normal Netty pipeline, allowing
+ * writes to be enqueued from any thread, it then schedules a task pipeline task,
+ * which shuffles messages from the queue into the pipeline.
+ *
+ * Note this is an *Inbound* handler, as it reacts to channel writability changing,
+ * which in the Netty vocabulary is an inbound event. This has already changed in
+ * the Netty 5.0.0 API, where Handlers are unified.
+ */
+final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
+    public interface MessageHolder<T> {
+        /**
+         * Take ownership of the encapsulated listener. Guaranteed to
+         * be called at most once.
+         *
+         * @return listener encapsulated in the holder, may be null
+         * @throws IllegalStateException if the listener is no longer
+         *         available (for example because it has already been
+         *         taken).
+         */
+        GenericFutureListener<Future<Void>> takeListener();
+
+        /**
+         * Take ownership of the encapsulated message. Guaranteed to be
+         * called at most once.
+         *
+         * @return message encapsulated in the holder, may not be null
+         * @throws IllegalStateException if the message is no longer
+         *         available (for example because it has already been
+         *         taken).
+         */
+        T takeMessage();
+    }
+
+    /**
+     * This is the default upper bound we place on the flush task running
+     * a single iteration. We relinquish control after about this amount
+     * of time.
+     */
+    private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
+
+    /**
+     * We re-check the time spent flushing every this many messages. We do this because
+     * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
+     * or similar to disable the feature.
+     */
+    private static final int WORKTIME_RECHECK_MSGS = 64;
+    private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
+
+    // Passed to executor to request triggering of flush
+    private final Runnable flushRunnable = new Runnable() {
+        @Override
+        public void run() {
+            ChannelOutboundQueue.this.flush();
+        }
+    };
+
+    /*
+     * Instead of using an AtomicBoolean object, we use these two. It saves us
+     * from allocating an extra object.
+     */
+    private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");
+    private volatile int flushScheduled = 0;
+
+    private final Queue<MessageHolder<?>> queue;
+    private final long maxWorkTime;
+    private final Channel channel;
+    private final InetSocketAddress address;
+
+    public ChannelOutboundQueue(final Channel channel, final int queueDepth, final InetSocketAddress address) {
+        Preconditions.checkArgument(queueDepth > 0, "Queue depth has to be positive");
+
+        /*
+         * This looks like a good trade-off for throughput. Alternative is
+         * to use an ArrayBlockingQueue -- but that uses a single lock to
+         * synchronize both producers and consumers, potentially leading
+         * to less throughput.
+         */
+        this.queue = new LinkedBlockingQueue<>(queueDepth);
+        this.channel = Preconditions.checkNotNull(channel);
+        this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
+        this.address = address;
+    }
+
+    /**
+     * Enqueue a message holder for transmission. Is a thread-safe entry point
+     * for the channel. If the cannot be placed on the queue, this
+     *
+     * @param holder MessageHolder which should be enqueue
+     * @return Success indicator, true if the enqueue operation succeeded,
+     *         false if the queue is full.
+     */
+    public boolean enqueue(final MessageHolder<?> holder) {
+        LOG.trace("Enqueuing message {}", holder);
+        if (queue.offer(holder)) {
+            LOG.trace("Message enqueued");
+            conditionalFlush();
+            return true;
+        }
+
+        LOG.trace("Message queue is full");
+        return false;
+    }
+
+    private void scheduleFlush(final EventExecutor executor) {
+        if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
+            LOG.trace("Scheduling flush task");
+            executor.execute(flushRunnable);
+        } else {
+            LOG.trace("Flush task is already present");
+        }
+    }
+
+    /**
+     * Schedule a queue flush if it is not empty and the channel is found
+     * to be writable.
+     */
+    private void conditionalFlush() {
+        if (queue.isEmpty()) {
+            LOG.trace("Queue is empty, not flush needed");
+            return;
+        }
+        if (!channel.isWritable()) {
+            LOG.trace("Channel {} is not writable, not issuing a flush", channel);
+            return;
+        }
+
+        scheduleFlush(channel.pipeline().lastContext().executor());
+    }
+
+    /*
+     * The synchronized keyword should be unnecessary, really, but it enforces
+     * queue order should something go terribly wrong. It should be completely
+     * uncontended.
+     */
+    private synchronized void flush() {
+        final long start = System.nanoTime();
+        final long deadline = start + maxWorkTime;
+
+        LOG.debug("Dequeuing messages to channel {}", channel);
+
+        long messages = 0;
+        for (;; ++messages) {
+            if (!channel.isWritable()) {
+                LOG.trace("Channel is no longer writable");
+                break;
+            }
+
+            final MessageHolder<?> h = queue.poll();
+            if (h == null) {
+                LOG.trace("The queue is completely drained");
+                break;
+            }
+
+            final GenericFutureListener<Future<Void>> l = h.takeListener();
+            
+            final ChannelFuture p;
+            if (address == null) {
+                p = channel.write(new MessageListenerWrapper(h.takeMessage(), l));
+            } else {
+                p = channel.write(new UdpMessageListenerWrapper(h.takeMessage(), l, address));
+            }
+            if (l != null) {
+                p.addListener(l);
+            }
+
+            /*
+             * Check every WORKTIME_RECHECK_MSGS for exceeded time.
+             *
+             * XXX: given we already measure our flushing throughput, we
+             *      should be able to perform dynamic adjustments here.
+             *      is that additional complexity needed, though?
+             */
+            if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
+                LOG.trace("Exceeded allotted work time {}us",
+                        TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
+                break;
+            }
+        }
+
+        if (messages > 0) {
+            LOG.debug("Flushing {} message(s) to channel {}", messages, channel);
+            channel.flush();
+        }
+
+        final long stop = System.nanoTime();
+        LOG.debug("Flushed {} messages in {}us to channel {}",
+                messages, TimeUnit.NANOSECONDS.toMicros(stop - start), channel);
+
+        /*
+         * We are almost ready to terminate. This is a bit tricky, because
+         * we do not want to have a race window where a message would be
+         * stuck on the queue without a flush being scheduled.
+         *
+         * So we mark ourselves as not running and then re-check if a
+         * flush out is needed. That will re-synchronized with other threads
+         * such that only one flush is scheduled at any given time.
+         */
+        if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
+            LOG.warn("Channel {} queue {} flusher found unscheduled", channel, queue);
+        }
+
+        conditionalFlush();
+    }
+
+    private void conditionalFlush(final ChannelHandlerContext ctx) {
+        Preconditions.checkState(ctx.channel() == channel, "Inconsistent channel %s with context %s", channel, ctx);
+        conditionalFlush();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+        super.channelActive(ctx);
+        conditionalFlush(ctx);
+    }
+
+    @Override
+    public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
+        super.channelWritabilityChanged(ctx);
+        conditionalFlush(ctx);
+    }
+
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+        super.channelInactive(ctx);
+
+        long entries = 0;
+        LOG.debug("Channel shutdown, flushing queue...");
+        final Future<Void> result = ctx.newFailedFuture(new RejectedExecutionException("Channel disconnected"));
+        while (true) {
+            final MessageHolder<?> e = queue.poll();
+            if (e == null) {
+                break;
+            }
+
+            e.takeListener().operationComplete(result);
+            entries++;
+        }
+
+        LOG.debug("Flushed {} queue entries", entries);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);
+    }
+}