BUG-3219: introduce OutboundQueueException
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
index 43cbf02792bfc45ae62b4fd917b688313ee3454f..5c9a3515cea1de1a0d28a7d29c2d3c646dd9996c 100644 (file)
@@ -15,9 +15,9 @@ import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
@@ -103,9 +103,9 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     private void retireQueue(final OutboundQueueImpl queue) {
         if (queueCache.offer(queue)) {
-            LOG.debug("Saving queue {} for later reuse", queue);
+            LOG.trace("Saving queue {} for later reuse", queue);
         } else {
-            LOG.debug("Queue {} thrown away", queue);
+            LOG.trace("Queue {} thrown away", queue);
         }
     }
 
@@ -117,10 +117,10 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         final OutboundQueueImpl queue;
         if (cached != null) {
             queue = cached.reuse(baseXid);
-            LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
+            LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
         } else {
             queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
-            LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel());
+            LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
         }
 
         activeQueues.add(queue);
@@ -131,12 +131,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private void scheduleBarrierTimer(final long now) {
         long next = lastBarrierNanos + maxBarrierNanos;
         if (next < now) {
-            LOG.debug("Attempted to schedule barrier in the past, reset maximum)");
+            LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
             next = now + maxBarrierNanos;
         }
 
         final long delay = next - now;
-        LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
+        LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
         barrierTimerEnabled = true;
     }
@@ -144,12 +144,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private void scheduleBarrierMessage() {
         final Long xid = currentQueue.reserveBarrierIfNeeded();
         if (xid == null) {
-            LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue);
+            LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
             return;
         }
 
         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
-        LOG.debug("Barrier XID {} scheduled", xid);
+        LOG.trace("Barrier XID {} scheduled", xid);
     }
 
     /**
@@ -176,13 +176,13 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         }
 
         if (message instanceof BarrierInput) {
-            LOG.debug("Barrier message seen, resetting counters");
+            LOG.trace("Barrier message seen, resetting counters");
             nonBarrierMessages = 0;
             lastBarrierNanos = now;
         } else {
             nonBarrierMessages++;
             if (nonBarrierMessages >= queueSize) {
-                LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+                LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
                 scheduleBarrierMessage();
             } else if (!barrierTimerEnabled) {
                 scheduleBarrierTimer(now);
@@ -200,7 +200,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      * @return True if the message matched a previous request, false otherwise.
      */
     boolean onMessage(final OfHeader message) {
-        LOG.debug("Attempting to pair message {} to a request", message);
+        LOG.trace("Attempting to pair message {} to a request", message);
 
         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
         while (it.hasNext()) {
@@ -211,12 +211,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                 continue;
             }
 
-            LOG.debug("Queue {} accepted response {}", queue, message);
+            LOG.trace("Queue {} accepted response {}", queue, message);
 
             // This has been a barrier request, we need to flush all
             // previous queues
             if (entry.isBarrier() && activeQueues.size() > 1) {
-                LOG.debug("Queue {} indicated request was a barrier", queue);
+                LOG.trace("Queue {} indicated request was a barrier", queue);
 
                 it = activeQueues.iterator();
                 while (it.hasNext()) {
@@ -225,7 +225,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                     // We want to complete all queues before the current one, we will
                     // complete the current queue below
                     if (!queue.equals(q)) {
-                        LOG.debug("Queue {} is implied finished", q);
+                        LOG.trace("Queue {} is implied finished", q);
                         q.completeAll();
                         it.remove();
                         retireQueue(q);
@@ -236,7 +236,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             }
 
             if (queue.isFinished()) {
-                LOG.debug("Queue {} is finished", queue);
+                LOG.trace("Queue {} is finished", queue);
                 it.remove();
                 retireQueue(queue);
             }
@@ -273,7 +273,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
         barrierTimerEnabled = false;
         if (currentQueue == null) {
-            LOG.debug("Channel shut down, not processing barrier");
+            LOG.trace("Channel shut down, not processing barrier");
             return;
         }
 
@@ -283,7 +283,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
             // FIXME: we should be tracking requests/responses instead of this
             if (nonBarrierMessages == 0) {
-                LOG.debug("No messages written since last barrier, not issuing one");
+                LOG.trace("No messages written since last barrier, not issuing one");
             } else {
                 scheduleBarrierMessage();
             }
@@ -397,9 +397,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         LOG.debug("Channel shutdown, flushing queue...");
         handler.onConnectionQueueChanged(null);
 
-        final Throwable cause = new RejectedExecutionException("Channel disconnected");
         for (OutboundQueueImpl queue : activeQueues) {
-            entries += queue.failAll(cause);
+            entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
         }
         activeQueues.clear();