Introduce ChannelOutboundQueue 36/7136/10
authorRobert Varga <rovarga@cisco.com>
Sat, 17 May 2014 07:59:58 +0000 (09:59 +0200)
committerMichal Polkorab <michal.polkorab@pantheon.sk>
Fri, 30 May 2014 13:16:30 +0000 (13:16 +0000)
This patch introduces a tunable queue for outstanding requests. The core
idea is to maintain a limited queue between the message produces and the
IO threads. The producers place their pre-made messages to the queue,
where the IO thread picks them up and pushes them down the pipeline.
Should the queue be empty, the caller is informed of this fact and can
choose a recovery strategy -- current implementation reports
RejectedExecutionException.

The flush task is placed in the IO pool only when the channel is
writable and the queue is non-empty. A single run of the flush task is
time-limited, so a single channel which is being churned by producer
does not end up monopolizing a particular thread for extended periods of
time -- after a tunable time the flush task will re-insert itself on the
task queue and exit.

Change-Id: If6f09c60cf2cf5a69e9f051e2dfe2bae3bb90b5d
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/AbstractRpcListener.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ChannelOutboundQueue.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ResponseExpectedRpcListener.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SimpleRpcListener.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java

index 2855592365ce632a58da956d72f8fb795f781bb2..170027140fbedd80fdced0788fcaeca0cf57b613 100644 (file)
@@ -19,16 +19,24 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
-abstract class AbstractRpcListener<T> implements GenericFutureListener<Future<? super Void>> {
+/**
+ * This class holds all the context we need for sending a single message down the tube.
+ * A MessageHolder (used in queue) and the actual listener. It is not a thing of beauty,
+ * but it keeps us from allocating unnecessary objects in the egress path.
+ */
+abstract class AbstractRpcListener<T> implements GenericFutureListener<Future<Void>>, ChannelOutboundQueue.MessageHolder<Object> {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractRpcListener.class);
     private final SettableFuture<RpcResult<T>> result = SettableFuture.create();
     private final String failureInfo;
+    private Object message;
 
-    AbstractRpcListener(final String failureInfo) {
-        this.failureInfo = failureInfo;
+    AbstractRpcListener(final Object message, final String failureInfo) {
+        this.failureInfo = Preconditions.checkNotNull(failureInfo);
+        this.message = Preconditions.checkNotNull(message);
     }
 
     public final ListenableFuture<RpcResult<T>> getResult() {
@@ -36,7 +44,7 @@ abstract class AbstractRpcListener<T> implements GenericFutureListener<Future<?
     }
 
     @Override
-    public final void operationComplete(final Future<? super Void> future) {
+    public final void operationComplete(final Future<Void> future) {
         if (!future.isSuccess()) {
             LOG.debug("operation failed");
             failedRpc(future.cause());
@@ -46,12 +54,24 @@ abstract class AbstractRpcListener<T> implements GenericFutureListener<Future<?
         }
     }
 
+    @Override
+    public final Object takeMessage() {
+        final Object ret = message;
+        Preconditions.checkState(ret != null, "Message has already been taken");
+        message = null;
+        return ret;
+    }
+
+    @Override
+    public final GenericFutureListener<Future<Void>> takeListener() {
+        return this;
+    }
+
     abstract protected void operationSuccessful();
 
     protected final void failedRpc(final Throwable cause) {
         final RpcError rpcError = ConnectionAdapterImpl.buildRpcError(
                 failureInfo, ErrorSeverity.ERROR, "check switch connection", cause);
-
         result.set(Rpcs.getRpcResult(
                 false,
                 (T)null,
@@ -59,11 +79,9 @@ abstract class AbstractRpcListener<T> implements GenericFutureListener<Future<?
     }
 
     protected final void successfulRpc(final T value) {
-
         result.set(Rpcs.getRpcResult(
                 true,
                 value,
                 Collections.<RpcError>emptyList()));
     }
-
 }
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ChannelOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ChannelOutboundQueue.java
new file mode 100644 (file)
index 0000000..a3c038f
--- /dev/null
@@ -0,0 +1,248 @@
+/*
+ * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowjava.protocol.impl.connection;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+
+/**
+ * Channel handler which bypasses wraps on top of normal Netty pipeline, allowing
+ * writes to be enqueued from any thread, it then schedules a task pipeline task,
+ * which shuffles messages from the queue into the pipeline.
+ *
+ * Note this is an *Inbound* handler, as it reacts to channel writability changing,
+ * which in the Netty vocabulary is an inbound event. This has already changed in
+ * the Netty 5.0.0 API, where Handlers are unified.
+ */
+final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
+    public interface MessageHolder<T> {
+        /**
+         * Take ownership of the encapsulated listener. Guaranteed to
+         * be called at most once.
+         *
+         * @return listener encapsulated in the holder, may be null
+         * @throws IllegalStateException if the listener is no longer
+         *         available (for example because it has already been
+         *         taken).
+         */
+        GenericFutureListener<Future<Void>> takeListener();
+
+        /**
+         * Take ownership of the encapsulated message. Guaranteed to be
+         * called at most once.
+         *
+         * @return message encapsulated in the holder, may not be null
+         * @throws IllegalStateException if the message is no longer
+         *         available (for example because it has already been
+         *         taken).
+         */
+        T takeMessage();
+    }
+
+    /**
+     * This is the default upper bound we place on the flush task running
+     * a single iteration. We relinquish control after about this amount
+     * of time.
+     */
+    private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
+
+    /**
+     * We re-check the time spent flushing every this many messages. We do this because
+     * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
+     * or similar to disable the feature.
+     */
+    private static final int WORKTIME_RECHECK_MSGS = 64;
+    private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
+
+    // Passed to executor to request triggering of flush
+    private final Runnable flushRunnable = new Runnable() {
+        @Override
+        public void run() {
+            ChannelOutboundQueue.this.flush();
+        }
+    };
+
+    /*
+     * Instead of using an AtomicBoolean object, we use these two. It saves us
+     * from allocating an extra object.
+     */
+    private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");
+    private volatile int flushScheduled = 0;
+
+    private final Queue<MessageHolder<?>> queue;
+    private final long maxWorkTime;
+    private final Channel channel;
+
+    public ChannelOutboundQueue(final Channel channel, final int queueDepth) {
+        Preconditions.checkArgument(queueDepth > 0, "Queue depth has to be positive");
+
+        /*
+         * This looks like a good trade-off for throughput. Alternative is
+         * to use an ArrayBlockingQueue -- but that uses a single lock to
+         * synchronize both producers and consumers, potentially leading
+         * to less throughput.
+         */
+        this.queue = new LinkedBlockingQueue<>(queueDepth);
+        this.channel = Preconditions.checkNotNull(channel);
+        this.maxWorkTime = DEFAULT_WORKTIME_MICROS;
+    }
+
+    /**
+     * Enqueue a message holder for transmission. Is a thread-safe entry point
+     * for the channel. If the cannot be placed on the queue, this
+     *
+     * @param holder MessageHolder which should be enqueue
+     * @return Success indicator, true if the enqueue operation succeeded,
+     *         false if the queue is full.
+     */
+    public boolean enqueue(final MessageHolder<?> holder) {
+        LOG.trace("Enqueuing message {}", holder);
+        if (queue.offer(holder)) {
+            LOG.trace("Message enqueued");
+            conditionalFlush();
+            return true;
+        }
+
+        LOG.trace("Message queue is full");
+        return false;
+    }
+
+    private void scheduleFlush(final EventExecutor executor) {
+        if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
+            LOG.trace("Scheduling flush task");
+            executor.execute(flushRunnable);
+        } else {
+            LOG.trace("Flush task is already present");
+        }
+    }
+
+    /**
+     * Schedule a queue flush if it is not empty and the channel is found
+     * to be writable.
+     */
+    private void conditionalFlush() {
+        if (queue.isEmpty()) {
+            LOG.trace("Queue is empty, not flush needed");
+            return;
+        }
+        if (!channel.isWritable()) {
+            LOG.trace("Channel {} is not writable, not issuing a flush", channel);
+            return;
+        }
+
+        scheduleFlush(channel.pipeline().lastContext().executor());
+    }
+
+    /*
+     * The synchronized keyword should be unnecessary, really, but it enforces
+     * queue order should something go terribly wrong. It should be completely
+     * uncontended.
+     */
+    private synchronized void flush() {
+        final Stopwatch w = new Stopwatch().start();
+
+        LOG.debug("Dequeuing messages to channel {}", channel);
+
+        long messages = 0;
+        for (;; ++messages) {
+            if (!channel.isWritable()) {
+                LOG.trace("Channel is no longer writable");
+                break;
+            }
+
+            final MessageHolder<?> h = queue.poll();
+            if (h == null) {
+                LOG.trace("The queue is completely drained");
+                break;
+            }
+
+            final ChannelFuture p = channel.write(h.takeMessage());
+            final GenericFutureListener<Future<Void>> l = h.takeListener();
+            if (l != null) {
+                p.addListener(l);
+            }
+
+            /*
+             * Check every WORKTIME_RECHECK_MSGS for exceeded time.
+             *
+             * XXX: given we already measure our flushing throughput, we
+             *      should be able to perform dynamic adjustments here.
+             *      is that additional complexity needed, though?
+             */
+            if ((messages % WORKTIME_RECHECK_MSGS) == 0 &&
+                    w.elapsed(TimeUnit.MICROSECONDS) >= maxWorkTime) {
+                LOG.trace("Exceeded allotted work time {}us", maxWorkTime);
+                break;
+            }
+        }
+
+        if (messages > 0) {
+            LOG.debug("Flushing {} message(s) to channel {}", messages, channel);
+            channel.flush();
+        }
+
+        w.stop();
+        LOG.debug("Flushed {} messages in {}us to channel {}",
+                messages, w.elapsed(TimeUnit.MICROSECONDS), channel);
+
+        /*
+         * We are almost ready to terminate. This is a bit tricky, because
+         * we do not want to have a race window where a message would be
+         * stuck on the queue without a flush being scheduled.
+         *
+         * So we mark ourselves as not running and then re-check if a
+         * flush out is needed. That will re-synchronized with other threads
+         * such that only one flush is scheduled at any given time.
+         */
+        if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
+            LOG.warn("Channel {} queue {} flusher found unscheduled", channel, queue);
+        }
+
+        conditionalFlush();
+    }
+
+    private void conditionalFlush(final ChannelHandlerContext ctx) {
+        Preconditions.checkState(ctx.channel() == channel, "Inconsistent channel %s with context %s", channel, ctx);
+        conditionalFlush();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+        super.channelActive(ctx);
+        conditionalFlush(ctx);
+    }
+
+    @Override
+    public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
+        super.channelWritabilityChanged(ctx);
+        conditionalFlush(ctx);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);
+    }
+}
index 28cee017230d8116a17f9408feb1631b46275805..6fd5fd7f55949d68c41316f5c935658955b5d203 100644 (file)
@@ -15,6 +15,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.sal.common.util.RpcErrors;
@@ -84,8 +85,16 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     /** after this time, RPC future response objects will be thrown away (in minutes) */
     public static final int RPC_RESPONSE_EXPIRATION = 1;
 
+    /**
+     * Default depth of write queue, e.g. we allow these many messages
+     * to be queued up before blocking producers.
+     */
+    public static final int DEFAULT_QUEUE_DEPTH = 1024;
+
     private static final Logger LOG = LoggerFactory
             .getLogger(ConnectionAdapterImpl.class);
+    private static final Exception QUEUE_FULL_EXCEPTION =
+            new RejectedExecutionException("Output queue is full");
 
     private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY";
     private static final String TAG = "OPENFLOW";
@@ -93,7 +102,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
             new RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>>() {
         @Override
         public void onRemoval(
-                RemovalNotification<RpcResponseKey, ResponseExpectedRpcListener<?>> notification) {
+                final RemovalNotification<RpcResponseKey, ResponseExpectedRpcListener<?>> notification) {
             notification.getValue().discard();
         }
     };
@@ -101,6 +110,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     /** expiring cache for future rpcResponses */
     private final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> responseCache;
 
+    private final ChannelOutboundQueue output;
     private final Channel channel;
 
     private ConnectionReadyListener connectionReadyListener;
@@ -118,6 +128,8 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
                 .removalListener(REMOVAL_LISTENER).build();
         this.channel = Preconditions.checkNotNull(channel);
+        this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH);
+        channel.pipeline().addLast(output);
         LOG.debug("ConnectionAdapter created");
     }
 
@@ -303,6 +315,19 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
         }
     }
 
+    private <T> ListenableFuture<RpcResult<T>> enqueueMessage(final AbstractRpcListener<T> promise) {
+        LOG.debug("Submitting promise {}", promise);
+
+        if (!output.enqueue(promise)) {
+            LOG.debug("Message queue is full, rejecting execution");
+            promise.failedRpc(QUEUE_FULL_EXCEPTION);
+        } else {
+            LOG.debug("Promise enqueued successfully");
+        }
+
+        return promise.getResult();
+    }
+
     /**
      * sends given message to switch, sending result will be reported via return value
      * @param input message to send
@@ -315,13 +340,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
      */
     private ListenableFuture<RpcResult<Void>> sendToSwitchFuture(
             final DataObject input, final String failureInfo) {
-        final SimpleRpcListener listener = new SimpleRpcListener(failureInfo);
-
-        LOG.debug("going to flush");
-        channel.writeAndFlush(input).addListener(listener);
-        LOG.debug("flushed");
-
-        return listener.getResult();
+        return enqueueMessage(new SimpleRpcListener(input, failureInfo));
     }
 
     /**
@@ -343,13 +362,8 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
             final IN input, final Class<OUT> responseClazz, final String failureInfo) {
         final RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz.getName());
         final ResponseExpectedRpcListener<OUT> listener =
-                new ResponseExpectedRpcListener<>(failureInfo, responseCache, key);
-
-        LOG.debug("going to flush");
-        channel.writeAndFlush(input).addListener(listener);
-        LOG.debug("flushed");
-
-        return listener.getResult();
+                new ResponseExpectedRpcListener<>(input, failureInfo, responseCache, key);
+        return enqueueMessage(listener);
     }
 
     /**
index ea43d7e0efa659fe48c40dace3d6905790137678..a5bf896bd3ce6f80277f9f8625668a4e87cfc7d6 100644 (file)
@@ -21,9 +21,9 @@ final class ResponseExpectedRpcListener<T extends OfHeader> extends AbstractRpcL
     private final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache;
     private final RpcResponseKey key;
 
-    ResponseExpectedRpcListener(final String failureInfo,
+    ResponseExpectedRpcListener(final Object message, final String failureInfo,
             final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache, final RpcResponseKey key) {
-        super(failureInfo);
+        super(message, failureInfo);
         this.cache = Preconditions.checkNotNull(cache);
         this.key = Preconditions.checkNotNull(key);
     }
index 1dc1f566cb8b1f1549cb7396c7ea0706cd460926..fa675bc59ec420de71a581506183d6f4b86d0201 100644 (file)
@@ -8,8 +8,8 @@
 package org.opendaylight.openflowjava.protocol.impl.connection;
 
 final class SimpleRpcListener extends AbstractRpcListener<Void> {
-    public SimpleRpcListener(final String failureInfo) {
-        super(failureInfo);
+    public SimpleRpcListener(final Object message, final String failureInfo) {
+        super(message, failureInfo);
     }
 
     @Override
index 2b5f6792c3638c40690ebd2fea2a163f4eb865cb..5cd978c361d186980954a39b434e05d1a56254eb 100644 (file)
@@ -22,8 +22,8 @@ import java.net.InetSocketAddress;
 
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
 import org.opendaylight.openflowjava.protocol.impl.connection.ServerFacade;
-import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
 import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
+import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,17 +36,27 @@ import com.google.common.util.concurrent.SettableFuture;
  * @author michal.polkorab
  */
 public class TcpHandler implements ServerFacade {
+    /*
+     * High/low write watermarks, in KiB.
+     */
+    private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64;
+    private static final int DEFAULT_WRITE_LOW_WATERMARK = 32;
+    /*
+     * Write spin count. This tells netty to immediately retry a non-blocking
+     * write this many times before moving on to selecting.
+     */
+    private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TcpHandler.class);
 
     private int port;
     private String address;
-    private InetAddress startupAddress;
+    private final InetAddress startupAddress;
     private NioEventLoopGroup workerGroup;
     private NioEventLoopGroup bossGroup;
-    private static final Logger LOGGER = LoggerFactory.getLogger(TcpHandler.class);
-    private SettableFuture<Boolean> isOnlineFuture;
-    
-    
-    private PublishingChannelInitializer channelInitializer;
+    private final SettableFuture<Boolean> isOnlineFuture;
+
+    private final PublishingChannelInitializer channelInitializer;
 
     /**
      * Enum used for storing names of used components (in pipeline).
@@ -86,23 +96,23 @@ public class TcpHandler implements ServerFacade {
          */
         DELEGATING_INBOUND_HANDLER,
     }
-    
+
 
     /**
      * Constructor of TCPHandler that listens on selected port.
      *
      * @param port listening port of TCPHandler server
      */
-    public TcpHandler(int port) {
+    public TcpHandler(final int port) {
         this(null, port);
     }
-    
+
     /**
      * Constructor of TCPHandler that listens on selected address and port.
      * @param address listening address of TCPHandler server
      * @param port listening port of TCPHandler server
      */
-    public TcpHandler(InetAddress address, int port) {
+    public TcpHandler(final InetAddress address, final int port) {
         this.port = port;
         this.startupAddress = address;
         channelInitializer = new PublishingChannelInitializer();
@@ -116,6 +126,18 @@ public class TcpHandler implements ServerFacade {
     public void run() {
         bossGroup = new NioEventLoopGroup();
         workerGroup = new NioEventLoopGroup();
+
+        /*
+         * We generally do not perform IO-unrelated tasks, so we want to have
+         * all outstanding tasks completed before the executing thread goes
+         * back into select.
+         *
+         * Any other setting means netty will measure the time it spent selecting
+         * and spend roughly proportional time executing tasks.
+         */
+        workerGroup.setIoRatio(100);
+
+        final ChannelFuture f;
         try {
             ServerBootstrap b = new ServerBootstrap();
             b.group(bossGroup, workerGroup)
@@ -124,24 +146,34 @@ public class TcpHandler implements ServerFacade {
                     .childHandler(channelInitializer)
                     .option(ChannelOption.SO_BACKLOG, 128)
                     .option(ChannelOption.SO_REUSEADDR, true)
-                    .childOption(ChannelOption.SO_KEEPALIVE, true);
+                    .childOption(ChannelOption.SO_KEEPALIVE, true)
+                    .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATERMARK * 1024)
+                    .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATERMARK * 1024)
+                    .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
 
-            ChannelFuture f;
             if (startupAddress != null) {
                 f = b.bind(startupAddress.getHostAddress(), port).sync();
             } else {
                 f = b.bind(port).sync();
             }
-            
+        } catch (InterruptedException e) {
+            LOGGER.error("Interrupted while binding port {}", port, e);
+            return;
+        }
+
+        try {
             InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
             address = isa.getHostString();
-            LOGGER.debug("address from tcphandler: " + address);
-            port = isa.getPort();
+
+            // Update port, as it may have been specified as 0
+            this.port = isa.getPort();
+
+            LOGGER.debug("address from tcphandler: {}", address);
             isOnlineFuture.set(true);
-            LOGGER.info("Switch listener started and ready to accept incoming connections on port: " + port);
+            LOGGER.info("Switch listener started and ready to accept incoming connections on port: {}", port);
             f.channel().closeFuture().sync();
-        } catch (InterruptedException ex) {
-            LOGGER.error(ex.getMessage(), ex);
+        } catch (InterruptedException e) {
+            LOGGER.error("Interrupted while waiting for port {} shutdown", port, e);
         } finally {
             shutdown();
         }
@@ -159,44 +191,44 @@ public class TcpHandler implements ServerFacade {
 
             @Override
             public void operationComplete(
-                    io.netty.util.concurrent.Future<Object> downResult) throws Exception {
+                    final io.netty.util.concurrent.Future<Object> downResult) throws Exception {
                 result.set(downResult.isSuccess());
                 if (downResult.cause() != null) {
                     result.setException(downResult.cause());
                 }
             }
-            
+
         });
         return result;
     }
-    
+
     /**
-     * 
+     *
      * @return number of connected clients / channels
      */
     public int getNumberOfConnections() {
         return channelInitializer.size();
     }
-    
+
     /**
      * @return channelInitializer providing channels
      */
     public PublishingChannelInitializer getChannelInitializer() {
         return channelInitializer;
     }
-    
+
     @Override
     public ListenableFuture<Boolean> getIsOnlineFuture() {
         return isOnlineFuture;
     }
-    
+
     /**
      * @return the port
      */
     public int getPort() {
         return port;
     }
-    
+
     /**
      * @return the address
      */
@@ -208,35 +240,35 @@ public class TcpHandler implements ServerFacade {
      * @param switchConnectionHandler
      */
     public void setSwitchConnectionHandler(
-            SwitchConnectionHandler switchConnectionHandler) {
+            final SwitchConnectionHandler switchConnectionHandler) {
         channelInitializer.setSwitchConnectionHandler(switchConnectionHandler);
     }
-    
+
     /**
      * @param switchIdleTimeout in milliseconds
      */
-    public void setSwitchIdleTimeout(long switchIdleTimeout) {
+    public void setSwitchIdleTimeout(final long switchIdleTimeout) {
         channelInitializer.setSwitchIdleTimeout(switchIdleTimeout);
     }
 
     /**
      * @param tlsSupported
      */
-    public void setEncryption(boolean tlsSupported) {
+    public void setEncryption(final boolean tlsSupported) {
         channelInitializer.setEncryption(tlsSupported);
     }
 
     /**
      * @param sf serialization factory
      */
-    public void setSerializationFactory(SerializationFactory sf) {
+    public void setSerializationFactory(final SerializationFactory sf) {
         channelInitializer.setSerializationFactory(sf);
     }
 
     /**
      * @param factory
      */
-    public void setDeserializationFactory(DeserializationFactory factory) {
+    public void setDeserializationFactory(final DeserializationFactory factory) {
         channelInitializer.setDeserializationFactory(factory);
     }