From deca37b27340c6e9b1576c281982d5d7f4ee1795 Mon Sep 17 00:00:00 2001 From: Vaclav Demcak Date: Sat, 26 Sep 2015 23:27:24 +0200 Subject: [PATCH] Barrier turn on/off - Split OutboundQueueManager * Split OutboundQueueManger functionaly to Abstract parent for split barrier functionality and Channel management Change-Id: Iff85917145034689fa494a679bf6a65b969c775b Signed-off-by: Vaclav Demcak --- .../AbstractOutboundQueueManager.java | 353 ++++++++++++++++++ .../core/connection/OutboundQueueManager.java | 313 +--------------- .../core/connection/StackedOutboundQueue.java | 7 +- 3 files changed, 361 insertions(+), 312 deletions(-) create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java new file mode 100644 index 00000000..49c6ddfa --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java @@ -0,0 +1,353 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.core.connection; + +import com.google.common.base.Preconditions; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nonnull; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class capsulate basic processing for stacking requests for netty channel + * and provide functionality for pairing request/response device message communication. + */ +abstract class AbstractOutboundQueueManager extends ChannelInboundHandlerAdapter + implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class); + + private static enum PipelineState { + /** + * Netty thread is potentially idle, no assumptions + * can be made about its state. + */ + IDLE, + /** + * Netty thread is currently reading, once the read completes, + * if will flush the queue in the {@link #WRITING} state. + */ + READING, + /** + * Netty thread is currently performing a flush on the queue. + * It will then transition to {@link #IDLE} state. + */ + WRITING, + } + + /** + * Default low write watermark. Channel will become writable when number of outstanding + * bytes dips below this value. + */ + private static final int DEFAULT_LOW_WATERMARK = 128 * 1024; + + /** + * Default write high watermark. Channel will become un-writable when number of + * outstanding bytes hits this value. + */ + private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2; + + private final AtomicBoolean flushScheduled = new AtomicBoolean(); + protected final ConnectionAdapterImpl parent; + protected final InetSocketAddress address; + protected final StackedOutboundQueue currentQueue; + private final T handler; + + // Accessed concurrently + private volatile PipelineState state = PipelineState.IDLE; + + // Updated from netty only + private boolean alreadyReading; + protected boolean shuttingDown; + + // Passed to executor to request triggering of flush + protected final Runnable flushRunnable = new Runnable() { + @Override + public void run() { + flush(); + } + }; + + AbstractOutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) { + this.parent = Preconditions.checkNotNull(parent); + this.handler = Preconditions.checkNotNull(handler); + this.address = address; + currentQueue = new StackedOutboundQueue(this); + LOG.debug("Queue manager instantiated with queue {}", currentQueue); + + handler.onConnectionQueueChanged(currentQueue); + } + + @Override + public void close() { + handler.onConnectionQueueChanged(null); + } + + @Override + public String toString() { + return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get()); + } + + @Override + public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { + /* + * Tune channel write buffering. We increase the writability window + * to ensure we can flush an entire queue segment in one go. We definitely + * want to keep the difference above 64k, as that will ensure we use jam-packed + * TCP packets. UDP will fragment as appropriate. + */ + ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK); + ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK); + + super.handlerAdded(ctx); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + conditionalFlush(); + } + + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + + // Run flush regardless of writability. This is not strictly required, as + // there may be a scheduled flush. Instead of canceling it, which is expensive, + // we'll steal its work. Note that more work may accumulate in the time window + // between now and when the task will run, so it may not be a no-op after all. + // + // The reason for this is to will the output buffer before we go into selection + // phase. This will make sure the pipe is full (in which case our next wake up + // will be the queue becoming writable). + writeAndFlush(); + } + + @Override + public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { + super.channelWritabilityChanged(ctx); + + // The channel is writable again. There may be a flush task on the way, but let's + // steal its work, potentially decreasing latency. Since there is a window between + // now and when it will run, it may still pick up some more work to do. + LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel()); + writeAndFlush(); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + + LOG.debug("Channel {} initiating shutdown...", ctx.channel()); + + shuttingDown = true; + final long entries = currentQueue.startShutdown(ctx.channel()); + LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel()); + + scheduleFlush(); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + // Netty does not provide a 'start reading' callback, so this is our first + // (and repeated) chance to detect reading. Since this callback can be invoked + // multiple times, we keep a boolean we check. That prevents a volatile write + // on repeated invocations. It will be cleared in channelReadComplete(). + if (!alreadyReading) { + alreadyReading = true; + state = PipelineState.READING; + } + super.channelRead(ctx, msg); + } + + /** + * Invoked whenever a message comes in from the switch. Runs matching + * on all active queues in an attempt to complete a previous request. + * + * @param message Potential response message + * @return True if the message matched a previous request, false otherwise. + */ + boolean onMessage(final OfHeader message) { + LOG.trace("Attempting to pair message {} to a request", message); + + return currentQueue.pairRequest(message); + } + + T getHandler() { + return handler; + } + + void ensureFlushing() { + // If the channel is not writable, there's no point in waking up, + // once we become writable, we will run a full flush + if (!parent.getChannel().isWritable()) { + return; + } + + // We are currently reading something, just a quick sync to ensure we will in fact + // flush state. + final PipelineState localState = state; + LOG.debug("Synchronize on pipeline state {}", localState); + switch (localState) { + case READING: + // Netty thread is currently reading, it will flush the pipeline once it + // finishes reading. This is a no-op situation. + break; + case WRITING: + case IDLE: + default: + // We cannot rely on the change being flushed, schedule a request + scheduleFlush(); + } + } + + /** + * Method immediately response on Echo message. + * + * @param message incoming Echo message from device + */ + void onEchoRequest(final EchoRequestMessage message) { + final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()) + .setVersion(message.getVersion()).setXid(message.getXid()).build(); + parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply)); + } + + /** + * Wraps outgoing message and includes listener attached to this message + * which is send to OFEncoder for serialization. Correct wrapper is + * selected by communication pipeline. + * + * @param message + * @param now + */ + void writeMessage(final OfHeader message, final long now) { + final Object wrapper = makeMessageListenerWrapper(message); + parent.getChannel().write(wrapper); + } + + /** + * Wraps outgoing message and includes listener attached to this message + * which is send to OFEncoder for serialization. Correct wrapper is + * selected by communication pipeline. + * + * @return + */ + private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) { + Preconditions.checkArgument(msg != null); + + if (address == null) { + return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER); + } + return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address); + } + + /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */ + private static final GenericFutureListener> LOG_ENCODER_LISTENER = new GenericFutureListener>() { + + private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener"); + + @Override + public void operationComplete(final Future future) throws Exception { + if (future.cause() != null) { + LOGGER.warn("Message encoding fail !", future.cause()); + } + } + }; + + /** + * Perform a single flush operation. We keep it here so we do not generate + * syntetic accessors for private fields. Otherwise it could be moved into {@link #flushRunnable}. + */ + protected void flush() { + // If the channel is gone, just flush whatever is not completed + if (!shuttingDown) { + LOG.trace("Dequeuing messages to channel {}", parent.getChannel()); + writeAndFlush(); + rescheduleFlush(); + } else if (currentQueue.finishShutdown()) { + close(); + LOG.debug("Channel {} shutdown complete", parent.getChannel()); + } else { + LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel()); + rescheduleFlush(); + } + } + + private void scheduleFlush() { + if (flushScheduled.compareAndSet(false, true)) { + LOG.trace("Scheduling flush task on channel {}", parent.getChannel()); + parent.getChannel().eventLoop().execute(flushRunnable); + } else { + LOG.trace("Flush task is already present on channel {}", parent.getChannel()); + } + } + + private void writeAndFlush() { + state = PipelineState.WRITING; + + final long start = System.nanoTime(); + + final int entries = currentQueue.writeEntries(parent.getChannel(), start); + if (entries > 0) { + LOG.trace("Flushing channel {}", parent.getChannel()); + parent.getChannel().flush(); + } + + if (LOG.isDebugEnabled()) { + final long stop = System.nanoTime(); + LOG.debug("Flushed {} messages to channel {} in {}us", entries, parent.getChannel(), + TimeUnit.NANOSECONDS.toMicros(stop - start)); + } + + state = PipelineState.IDLE; + } + + private void rescheduleFlush() { + /* + * We are almost ready to terminate. This is a bit tricky, because + * we do not want to have a race window where a message would be + * stuck on the queue without a flush being scheduled. + * So we mark ourselves as not running and then re-check if a + * flush out is needed. That will re-synchronized with other threads + * such that only one flush is scheduled at any given time. + */ + if (!flushScheduled.compareAndSet(true, false)) { + LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this); + } + + conditionalFlush(); + } + + /** + * Schedule a queue flush if it is not empty and the channel is found + * to be writable. May only be called from Netty context. + */ + private void conditionalFlush() { + if (currentQueue.needsFlush()) { + if (shuttingDown || parent.getChannel().isWritable()) { + scheduleFlush(); + } else { + LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel()); + } + } else { + LOG.trace("Queue is empty, no flush needed"); + } + } +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java index ebf956f9..6b90daae 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -8,81 +8,24 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nonnull; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class OutboundQueueManager extends ChannelInboundHandlerAdapter implements AutoCloseable { - private static enum PipelineState { - /** - * Netty thread is potentially idle, no assumptions - * can be made about its state. - */ - IDLE, - /** - * Netty thread is currently reading, once the read completes, - * if will flush the queue in the {@link #FLUSHING} state. - */ - READING, - /** - * Netty thread is currently performing a flush on the queue. - * It will then transition to {@link #IDLE} state. - */ - WRITING, - } - +final class OutboundQueueManager extends AbstractOutboundQueueManager { private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class); - /** - * Default low write watermark. Channel will become writable when number of outstanding - * bytes dips below this value. - */ - private static final int DEFAULT_LOW_WATERMARK = 128 * 1024; - - /** - * Default write high watermark. Channel will become un-writable when number of - * outstanding bytes hits this value. - */ - private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2; - - private final AtomicBoolean flushScheduled = new AtomicBoolean(); - private final StackedOutboundQueue currentQueue; - private final ConnectionAdapterImpl parent; - private final InetSocketAddress address; private final int maxNonBarrierMessages; private final long maxBarrierNanos; - private final T handler; - - // Accessed concurrently - private volatile PipelineState state = PipelineState.IDLE; // Updated from netty only - private boolean alreadyReading; private boolean barrierTimerEnabled; private long lastBarrierNanos = System.nanoTime(); private int nonBarrierMessages; - private boolean shuttingDown; - - // Passed to executor to request triggering of flush - private final Runnable flushRunnable = new Runnable() { - @Override - public void run() { - flush(); - } - }; // Passed to executor to request a periodic barrier check private final Runnable barrierRunnable = new Runnable() { @@ -94,27 +37,13 @@ final class OutboundQueueManager extends Channel OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler, final int maxNonBarrierMessages, final long maxBarrierNanos) { - this.parent = Preconditions.checkNotNull(parent); - this.handler = Preconditions.checkNotNull(handler); + super(parent, address, handler); Preconditions.checkArgument(maxNonBarrierMessages > 0); this.maxNonBarrierMessages = maxNonBarrierMessages; Preconditions.checkArgument(maxBarrierNanos > 0); this.maxBarrierNanos = maxBarrierNanos; - this.address = address; - - currentQueue = new StackedOutboundQueue(this); - LOG.debug("Queue manager instantiated with queue {}", currentQueue); - handler.onConnectionQueueChanged(currentQueue); } - T getHandler() { - return handler; - } - - @Override - public void close() { - handler.onConnectionQueueChanged(null); - } private void scheduleBarrierTimer(final long now) { long next = lastBarrierNanos + maxBarrierNanos; @@ -136,33 +65,11 @@ final class OutboundQueueManager extends Channel return; } - currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null); + currentQueue.commitEntry(xid, getHandler().createBarrierRequest(xid), null); LOG.trace("Barrier XID {} scheduled", xid); } - /** - * Invoked whenever a message comes in from the switch. Runs matching - * on all active queues in an attempt to complete a previous request. - * - * @param message Potential response message - * @return True if the message matched a previous request, false otherwise. - */ - boolean onMessage(final OfHeader message) { - LOG.trace("Attempting to pair message {} to a request", message); - - return currentQueue.pairRequest(message); - } - - private void scheduleFlush() { - if (flushScheduled.compareAndSet(false, true)) { - LOG.trace("Scheduling flush task on channel {}", parent.getChannel()); - parent.getChannel().eventLoop().execute(flushRunnable); - } else { - LOG.trace("Flush task is already present on channel {}", parent.getChannel()); - } - } - /** * Periodic barrier check. */ @@ -187,186 +94,6 @@ final class OutboundQueueManager extends Channel } } - private void rescheduleFlush() { - /* - * We are almost ready to terminate. This is a bit tricky, because - * we do not want to have a race window where a message would be - * stuck on the queue without a flush being scheduled. - * - * So we mark ourselves as not running and then re-check if a - * flush out is needed. That will re-synchronized with other threads - * such that only one flush is scheduled at any given time. - */ - if (!flushScheduled.compareAndSet(true, false)) { - LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this); - } - - conditionalFlush(); - } - - private void writeAndFlush() { - state = PipelineState.WRITING; - - final long start = System.nanoTime(); - - final int entries = currentQueue.writeEntries(parent.getChannel(), start); - if (entries > 0) { - LOG.trace("Flushing channel {}", parent.getChannel()); - parent.getChannel().flush(); - } - - if (LOG.isDebugEnabled()) { - final long stop = System.nanoTime(); - LOG.debug("Flushed {} messages to channel {} in {}us", entries, - parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start)); - } - - state = PipelineState.IDLE; - } - - /** - * Perform a single flush operation. We keep it here so we do not generate - * syntetic accessors for private fields. Otherwise it could be moved into - * {@link #flushRunnable}. - */ - protected void flush() { - // If the channel is gone, just flush whatever is not completed - if (!shuttingDown) { - LOG.trace("Dequeuing messages to channel {}", parent.getChannel()); - writeAndFlush(); - rescheduleFlush(); - } else if (currentQueue.finishShutdown()) { - handler.onConnectionQueueChanged(null); - LOG.debug("Channel {} shutdown complete", parent.getChannel()); - } else { - LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel()); - rescheduleFlush(); - } - } - - /** - * Schedule a queue flush if it is not empty and the channel is found - * to be writable. May only be called from Netty context. - */ - private void conditionalFlush() { - if (currentQueue.needsFlush()) { - if (shuttingDown || parent.getChannel().isWritable()) { - scheduleFlush(); - } else { - LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel()); - } - } else { - LOG.trace("Queue is empty, no flush needed"); - } - } - - @Override - public void channelActive(final ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - conditionalFlush(); - } - - @Override - public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { - /* - * Tune channel write buffering. We increase the writability window - * to ensure we can flush an entire queue segment in one go. We definitely - * want to keep the difference above 64k, as that will ensure we use jam-packed - * TCP packets. UDP will fragment as appropriate. - */ - ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK); - ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK); - - super.handlerAdded(ctx); - } - - @Override - public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { - super.channelWritabilityChanged(ctx); - - // The channel is writable again. There may be a flush task on the way, but let's - // steal its work, potentially decreasing latency. Since there is a window between - // now and when it will run, it may still pick up some more work to do. - LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel()); - writeAndFlush(); - } - - @Override - public void channelInactive(final ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - - LOG.debug("Channel {} initiating shutdown...", ctx.channel()); - - shuttingDown = true; - final long entries = currentQueue.startShutdown(ctx.channel()); - LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel()); - - scheduleFlush(); - } - - @Override - public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { - // Netty does not provide a 'start reading' callback, so this is our first - // (and repeated) chance to detect reading. Since this callback can be invoked - // multiple times, we keep a boolean we check. That prevents a volatile write - // on repeated invocations. It will be cleared in channelReadComplete(). - if (!alreadyReading) { - alreadyReading = true; - state = PipelineState.READING; - } - super.channelRead(ctx, msg); - } - - @Override - public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { - super.channelReadComplete(ctx); - - // Run flush regardless of writability. This is not strictly required, as - // there may be a scheduled flush. Instead of canceling it, which is expensive, - // we'll steal its work. Note that more work may accumulate in the time window - // between now and when the task will run, so it may not be a no-op after all. - // - // The reason for this is to will the output buffer before we go into selection - // phase. This will make sure the pipe is full (in which case our next wake up - // will be the queue becoming writable). - writeAndFlush(); - } - - @Override - public String toString() { - return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get()); - } - - void ensureFlushing() { - // If the channel is not writable, there's no point in waking up, - // once we become writable, we will run a full flush - if (!parent.getChannel().isWritable()) { - return; - } - - // We are currently reading something, just a quick sync to ensure we will in fact - // flush state. - final PipelineState localState = state; - LOG.debug("Synchronize on pipeline state {}", localState); - switch (localState) { - case READING: - // Netty thread is currently reading, it will flush the pipeline once it - // finishes reading. This is a no-op situation. - break; - case WRITING: - case IDLE: - default: - // We cannot rely on the change being flushed, schedule a request - scheduleFlush(); - } - } - - void onEchoRequest(final EchoRequestMessage message) { - final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()) - .setVersion(message.getVersion()).setXid(message.getXid()).build(); - parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply)); - } - /** * Write a message into the underlying channel. * @@ -377,10 +104,9 @@ final class OutboundQueueManager extends Channel * measure System.nanoTime() for each barrier -- needlessly * adding overhead. */ + @Override void writeMessage(final OfHeader message, final long now) { - final Object wrapper = makeMessageListenerWrapper(message); - parent.getChannel().write(wrapper); - + super.writeMessage(message, now); if (message instanceof BarrierInput) { LOG.trace("Barrier message seen, resetting counters"); nonBarrierMessages = 0; @@ -395,33 +121,4 @@ final class OutboundQueueManager extends Channel } } } - - /** - * Wraps outgoing message and includes listener attached to this message - * which is send to OFEncoder for serialization. Correct wrapper is - * selected by communication pipeline. - * - * @return - */ - private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) { - Preconditions.checkArgument(msg != null); - - if (address == null) { - return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER); - } - return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address); - } - - /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */ - private static final GenericFutureListener> LOG_ENCODER_LISTENER = new GenericFutureListener>() { - - private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener"); - - @Override - public void operationComplete(final Future future) throws Exception { - if (future.cause() != null) { - LOGGER.warn("Message encoding fail !", future.cause()); - } - } - }; } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java index efe71aa7..1da6dd3d 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java @@ -34,7 +34,7 @@ final class StackedOutboundQueue implements OutboundQueue { private final List unflushedSegments = new ArrayList<>(2); @GuardedBy("unflushedSegments") private final List uncompletedSegments = new ArrayList<>(2); - private final OutboundQueueManager manager; + private final AbstractOutboundQueueManager manager; private volatile long allocatedXid = -1; private volatile long barrierXid = -1; @@ -46,7 +46,7 @@ final class StackedOutboundQueue implements OutboundQueue { // Accessed from Netty only private int flushOffset; - StackedOutboundQueue(final OutboundQueueManager manager) { + StackedOutboundQueue(final AbstractOutboundQueueManager manager) { this.manager = Preconditions.checkNotNull(manager); firstSegment = StackedSegment.create(0L); uncompletedSegments.add(firstSegment); @@ -243,9 +243,8 @@ final class StackedOutboundQueue implements OutboundQueue { if (bXid >= fXid) { LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid); return null; - } else { - return reserveEntry(); } + return reserveEntry(); } boolean pairRequest(final OfHeader message) { -- 2.36.6