X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FOutboundQueueManager.java;h=5c9a3515cea1de1a0d28a7d29c2d3c646dd9996c;hb=763e430f61196740c5ea1c7356022396adf55065;hp=43cbf02792bfc45ae62b4fd917b688313ee3454f;hpb=096d3de0bc39bef5bb0493e721d794e850a5b49d;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java index 43cbf027..5c9a3515 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -15,9 +15,9 @@ import java.util.ArrayDeque; import java.util.Iterator; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; @@ -103,9 +103,9 @@ final class OutboundQueueManager extends Channel private void retireQueue(final OutboundQueueImpl queue) { if (queueCache.offer(queue)) { - LOG.debug("Saving queue {} for later reuse", queue); + LOG.trace("Saving queue {} for later reuse", queue); } else { - LOG.debug("Queue {} thrown away", queue); + LOG.trace("Queue {} thrown away", queue); } } @@ -117,10 +117,10 @@ final class OutboundQueueManager extends Channel final OutboundQueueImpl queue; if (cached != null) { queue = cached.reuse(baseXid); - LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel()); + LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel()); } else { queue = new OutboundQueueImpl(this, baseXid, queueSize + 1); - LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel()); + LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel()); } activeQueues.add(queue); @@ -131,12 +131,12 @@ final class OutboundQueueManager extends Channel private void scheduleBarrierTimer(final long now) { long next = lastBarrierNanos + maxBarrierNanos; if (next < now) { - LOG.debug("Attempted to schedule barrier in the past, reset maximum)"); + LOG.trace("Attempted to schedule barrier in the past, reset maximum)"); next = now + maxBarrierNanos; } final long delay = next - now; - LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay)); + LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay)); parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS); barrierTimerEnabled = true; } @@ -144,12 +144,12 @@ final class OutboundQueueManager extends Channel private void scheduleBarrierMessage() { final Long xid = currentQueue.reserveBarrierIfNeeded(); if (xid == null) { - LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue); + LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue); return; } currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null); - LOG.debug("Barrier XID {} scheduled", xid); + LOG.trace("Barrier XID {} scheduled", xid); } /** @@ -176,13 +176,13 @@ final class OutboundQueueManager extends Channel } if (message instanceof BarrierInput) { - LOG.debug("Barrier message seen, resetting counters"); + LOG.trace("Barrier message seen, resetting counters"); nonBarrierMessages = 0; lastBarrierNanos = now; } else { nonBarrierMessages++; if (nonBarrierMessages >= queueSize) { - LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages); + LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages); scheduleBarrierMessage(); } else if (!barrierTimerEnabled) { scheduleBarrierTimer(now); @@ -200,7 +200,7 @@ final class OutboundQueueManager extends Channel * @return True if the message matched a previous request, false otherwise. */ boolean onMessage(final OfHeader message) { - LOG.debug("Attempting to pair message {} to a request", message); + LOG.trace("Attempting to pair message {} to a request", message); Iterator it = activeQueues.iterator(); while (it.hasNext()) { @@ -211,12 +211,12 @@ final class OutboundQueueManager extends Channel continue; } - LOG.debug("Queue {} accepted response {}", queue, message); + LOG.trace("Queue {} accepted response {}", queue, message); // This has been a barrier request, we need to flush all // previous queues if (entry.isBarrier() && activeQueues.size() > 1) { - LOG.debug("Queue {} indicated request was a barrier", queue); + LOG.trace("Queue {} indicated request was a barrier", queue); it = activeQueues.iterator(); while (it.hasNext()) { @@ -225,7 +225,7 @@ final class OutboundQueueManager extends Channel // We want to complete all queues before the current one, we will // complete the current queue below if (!queue.equals(q)) { - LOG.debug("Queue {} is implied finished", q); + LOG.trace("Queue {} is implied finished", q); q.completeAll(); it.remove(); retireQueue(q); @@ -236,7 +236,7 @@ final class OutboundQueueManager extends Channel } if (queue.isFinished()) { - LOG.debug("Queue {} is finished", queue); + LOG.trace("Queue {} is finished", queue); it.remove(); retireQueue(queue); } @@ -273,7 +273,7 @@ final class OutboundQueueManager extends Channel LOG.debug("Channel {} barrier timer expired", parent.getChannel()); barrierTimerEnabled = false; if (currentQueue == null) { - LOG.debug("Channel shut down, not processing barrier"); + LOG.trace("Channel shut down, not processing barrier"); return; } @@ -283,7 +283,7 @@ final class OutboundQueueManager extends Channel LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast); // FIXME: we should be tracking requests/responses instead of this if (nonBarrierMessages == 0) { - LOG.debug("No messages written since last barrier, not issuing one"); + LOG.trace("No messages written since last barrier, not issuing one"); } else { scheduleBarrierMessage(); } @@ -397,9 +397,8 @@ final class OutboundQueueManager extends Channel LOG.debug("Channel shutdown, flushing queue..."); handler.onConnectionQueueChanged(null); - final Throwable cause = new RejectedExecutionException("Channel disconnected"); for (OutboundQueueImpl queue : activeQueues) { - entries += queue.failAll(cause); + entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED); } activeQueues.clear();