X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FOutboundQueueManager.java;h=f2a4b0e551b9e84162aad8118a2651e11984a5d5;hb=fb53aa09a2412caccf65731f5868b70915567ec2;hp=5c9a3515cea1de1a0d28a7d29c2d3c646dd9996c;hpb=20ac1113a590c3067f755a9508f915898b0185c7;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java index 5c9a3515..f2a4b0e5 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -20,6 +20,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +65,7 @@ final class OutboundQueueManager extends Channel private boolean barrierTimerEnabled; private int nonBarrierMessages; private long lastXid = 0; + private Integer shutdownOffset; // Passed to executor to request triggering of flush private final Runnable flushRunnable = new Runnable() { @@ -70,6 +74,8 @@ final class OutboundQueueManager extends Channel flush(); } }; + + // Passed to executor to request a periodic barrier check private final Runnable barrierRunnable = new Runnable() { @Override public void run() { @@ -103,6 +109,7 @@ final class OutboundQueueManager extends Channel private void retireQueue(final OutboundQueueImpl queue) { if (queueCache.offer(queue)) { + queue.retire(); LOG.trace("Saving queue {} for later reuse", queue); } else { LOG.trace("Queue {} thrown away", queue); @@ -249,15 +256,11 @@ final class OutboundQueueManager extends Channel } private void scheduleFlush() { - if (parent.getChannel().isWritable()) { - if (flushScheduled.compareAndSet(false, true)) { - LOG.trace("Scheduling flush task on channel {}", parent.getChannel()); - parent.getChannel().eventLoop().execute(flushRunnable); - } else { - LOG.trace("Flush task is already present on channel {}", parent.getChannel()); - } + if (flushScheduled.compareAndSet(false, true)) { + LOG.trace("Scheduling flush task on channel {}", parent.getChannel()); + parent.getChannel().eventLoop().execute(flushRunnable); } else { - LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel()); + LOG.trace("Flush task is already present on channel {}", parent.getChannel()); } } @@ -272,7 +275,7 @@ final class OutboundQueueManager extends Channel protected void barrier() { LOG.debug("Channel {} barrier timer expired", parent.getChannel()); barrierTimerEnabled = false; - if (currentQueue == null) { + if (shutdownOffset != null) { LOG.trace("Channel shut down, not processing barrier"); return; } @@ -290,10 +293,61 @@ final class OutboundQueueManager extends Channel } } + private void rescheduleFlush() { + /* + * We are almost ready to terminate. This is a bit tricky, because + * we do not want to have a race window where a message would be + * stuck on the queue without a flush being scheduled. + * + * So we mark ourselves as not running and then re-check if a + * flush out is needed. That will re-synchronized with other threads + * such that only one flush is scheduled at any given time. + */ + if (!flushScheduled.compareAndSet(true, false)) { + LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this); + } + + conditionalFlush(); + } + + private void shutdownFlush() { + long entries = 0; + + // Fail all queues + final Iterator it = activeQueues.iterator(); + while (it.hasNext()) { + final OutboundQueueImpl queue = it.next(); + + entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED); + if (queue.isFinished()) { + LOG.trace("Cleared queue {}", queue); + it.remove(); + } + } + + LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel()); + + Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet"); + if (currentQueue.isShutdown(shutdownOffset)) { + currentQueue = null; + handler.onConnectionQueueChanged(null); + LOG.debug("Channel {} shutdown complete", parent.getChannel()); + } else { + LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel()); + rescheduleFlush(); + } + } + /** * Perform a single flush operation. */ protected void flush() { + // If the channel is gone, just flush whatever is not completed + if (shutdownOffset != null) { + shutdownFlush(); + return; + } + final long start = System.nanoTime(); final long deadline = start + maxWorkTime; @@ -343,29 +397,15 @@ final class OutboundQueueManager extends Channel LOG.debug("Flushed {} messages in {}us to channel {}", messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel()); - /* - * We are almost ready to terminate. This is a bit tricky, because - * we do not want to have a race window where a message would be - * stuck on the queue without a flush being scheduled. - * - * So we mark ourselves as not running and then re-check if a - * flush out is needed. That will re-synchronized with other threads - * such that only one flush is scheduled at any given time. - */ - if (!flushScheduled.compareAndSet(true, false)) { - LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this); - } - - conditionalFlush(); + rescheduleFlush(); } - /** * Schedule a queue flush if it is not empty and the channel is found * to be writable. May only be called from Netty context. */ private void conditionalFlush() { - if (!currentQueue.isEmpty()) { + if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) { scheduleFlush(); } else { LOG.trace("Queue is empty, no flush needed"); @@ -393,20 +433,30 @@ final class OutboundQueueManager extends Channel public void channelInactive(final ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - long entries = 0; - LOG.debug("Channel shutdown, flushing queue..."); - handler.onConnectionQueueChanged(null); - - for (OutboundQueueImpl queue : activeQueues) { - entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED); - } - activeQueues.clear(); + LOG.debug("Channel {} initiating shutdown...", parent.getChannel()); - LOG.debug("Flushed {} queue entries", entries); + /* + * We are dealing with a multi-threaded shutdown, as the user may still + * be reserving entries in the queue. We are executing in a netty thread, + * so neither flush nor barrier can be running, which is good news. + * + * We will eat up all the slots in the queue here and mark the offset first + * reserved offset and free up all the cached queues. We then schedule + * the flush task, which will deal with the rest of the shutdown process. + */ + shutdownOffset = currentQueue.startShutdown(); + queueCache.clear(); + LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset); + scheduleFlush(); } @Override public String toString() { return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get()); } + + void onEchoRequest(final EchoRequestMessage message) { + final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build(); + parent.getChannel().writeAndFlush(reply); + } }