Fix precondition format string
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
index 43cbf02792bfc45ae62b4fd917b688313ee3454f..609e23c16acf52616d113eda51dbc6423f185970 100644 (file)
@@ -15,11 +15,14 @@ import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +65,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private boolean barrierTimerEnabled;
     private int nonBarrierMessages;
     private long lastXid = 0;
+    private Integer shutdownOffset;
 
     // Passed to executor to request triggering of flush
     private final Runnable flushRunnable = new Runnable() {
@@ -70,6 +74,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             flush();
         }
     };
+
+    // Passed to executor to request a periodic barrier check
     private final Runnable barrierRunnable = new Runnable() {
         @Override
         public void run() {
@@ -103,9 +109,9 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     private void retireQueue(final OutboundQueueImpl queue) {
         if (queueCache.offer(queue)) {
-            LOG.debug("Saving queue {} for later reuse", queue);
+            LOG.trace("Saving queue {} for later reuse", queue);
         } else {
-            LOG.debug("Queue {} thrown away", queue);
+            LOG.trace("Queue {} thrown away", queue);
         }
     }
 
@@ -117,10 +123,10 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         final OutboundQueueImpl queue;
         if (cached != null) {
             queue = cached.reuse(baseXid);
-            LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
+            LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
         } else {
             queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
-            LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel());
+            LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
         }
 
         activeQueues.add(queue);
@@ -131,12 +137,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private void scheduleBarrierTimer(final long now) {
         long next = lastBarrierNanos + maxBarrierNanos;
         if (next < now) {
-            LOG.debug("Attempted to schedule barrier in the past, reset maximum)");
+            LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
             next = now + maxBarrierNanos;
         }
 
         final long delay = next - now;
-        LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
+        LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
         barrierTimerEnabled = true;
     }
@@ -144,12 +150,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private void scheduleBarrierMessage() {
         final Long xid = currentQueue.reserveBarrierIfNeeded();
         if (xid == null) {
-            LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue);
+            LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
             return;
         }
 
         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
-        LOG.debug("Barrier XID {} scheduled", xid);
+        LOG.trace("Barrier XID {} scheduled", xid);
     }
 
     /**
@@ -176,13 +182,13 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         }
 
         if (message instanceof BarrierInput) {
-            LOG.debug("Barrier message seen, resetting counters");
+            LOG.trace("Barrier message seen, resetting counters");
             nonBarrierMessages = 0;
             lastBarrierNanos = now;
         } else {
             nonBarrierMessages++;
             if (nonBarrierMessages >= queueSize) {
-                LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+                LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
                 scheduleBarrierMessage();
             } else if (!barrierTimerEnabled) {
                 scheduleBarrierTimer(now);
@@ -200,7 +206,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      * @return True if the message matched a previous request, false otherwise.
      */
     boolean onMessage(final OfHeader message) {
-        LOG.debug("Attempting to pair message {} to a request", message);
+        LOG.trace("Attempting to pair message {} to a request", message);
 
         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
         while (it.hasNext()) {
@@ -211,12 +217,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                 continue;
             }
 
-            LOG.debug("Queue {} accepted response {}", queue, message);
+            LOG.trace("Queue {} accepted response {}", queue, message);
 
             // This has been a barrier request, we need to flush all
             // previous queues
             if (entry.isBarrier() && activeQueues.size() > 1) {
-                LOG.debug("Queue {} indicated request was a barrier", queue);
+                LOG.trace("Queue {} indicated request was a barrier", queue);
 
                 it = activeQueues.iterator();
                 while (it.hasNext()) {
@@ -225,7 +231,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                     // We want to complete all queues before the current one, we will
                     // complete the current queue below
                     if (!queue.equals(q)) {
-                        LOG.debug("Queue {} is implied finished", q);
+                        LOG.trace("Queue {} is implied finished", q);
                         q.completeAll();
                         it.remove();
                         retireQueue(q);
@@ -236,7 +242,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             }
 
             if (queue.isFinished()) {
-                LOG.debug("Queue {} is finished", queue);
+                LOG.trace("Queue {} is finished", queue);
                 it.remove();
                 retireQueue(queue);
             }
@@ -249,15 +255,11 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     }
 
     private void scheduleFlush() {
-        if (parent.getChannel().isWritable()) {
-            if (flushScheduled.compareAndSet(false, true)) {
-                LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
-                parent.getChannel().eventLoop().execute(flushRunnable);
-            } else {
-                LOG.trace("Flush task is already present on channel {}", parent.getChannel());
-            }
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+            parent.getChannel().eventLoop().execute(flushRunnable);
         } else {
-            LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+            LOG.trace("Flush task is already present on channel {}", parent.getChannel());
         }
     }
 
@@ -272,8 +274,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     protected void barrier() {
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
         barrierTimerEnabled = false;
-        if (currentQueue == null) {
-            LOG.debug("Channel shut down, not processing barrier");
+        if (shutdownOffset != null) {
+            LOG.trace("Channel shut down, not processing barrier");
             return;
         }
 
@@ -283,17 +285,68 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
             // FIXME: we should be tracking requests/responses instead of this
             if (nonBarrierMessages == 0) {
-                LOG.debug("No messages written since last barrier, not issuing one");
+                LOG.trace("No messages written since last barrier, not issuing one");
             } else {
                 scheduleBarrierMessage();
             }
         }
     }
 
+    private void rescheduleFlush() {
+        /*
+         * We are almost ready to terminate. This is a bit tricky, because
+         * we do not want to have a race window where a message would be
+         * stuck on the queue without a flush being scheduled.
+         *
+         * So we mark ourselves as not running and then re-check if a
+         * flush out is needed. That will re-synchronized with other threads
+         * such that only one flush is scheduled at any given time.
+         */
+        if (!flushScheduled.compareAndSet(true, false)) {
+            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+        }
+
+        conditionalFlush();
+    }
+
+    private void shutdownFlush() {
+        long entries = 0;
+
+        // Fail all queues
+        final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+        while (it.hasNext()) {
+            final OutboundQueueImpl queue = it.next();
+
+            entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+            if (queue.isFinished()) {
+                LOG.trace("Cleared queue {}", queue);
+                it.remove();
+            }
+        }
+
+        LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+
+        Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
+        if (currentQueue.isShutdown(shutdownOffset)) {
+            currentQueue = null;
+            handler.onConnectionQueueChanged(null);
+            LOG.debug("Channel {} shutdown complete", parent.getChannel());
+        } else {
+            LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+            rescheduleFlush();
+        }
+    }
+
     /**
      * Perform a single flush operation.
      */
     protected void flush() {
+        // If the channel is gone, just flush whatever is not completed
+        if (shutdownOffset != null) {
+            shutdownFlush();
+            return;
+        }
+
         final long start = System.nanoTime();
         final long deadline = start + maxWorkTime;
 
@@ -343,29 +396,15 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         LOG.debug("Flushed {} messages in {}us to channel {}",
                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
 
-        /*
-         * We are almost ready to terminate. This is a bit tricky, because
-         * we do not want to have a race window where a message would be
-         * stuck on the queue without a flush being scheduled.
-         *
-         * So we mark ourselves as not running and then re-check if a
-         * flush out is needed. That will re-synchronized with other threads
-         * such that only one flush is scheduled at any given time.
-         */
-        if (!flushScheduled.compareAndSet(true, false)) {
-            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
-        }
-
-        conditionalFlush();
+        rescheduleFlush();
     }
 
-
     /**
      * Schedule a queue flush if it is not empty and the channel is found
      * to be writable. May only be called from Netty context.
      */
     private void conditionalFlush() {
-        if (!currentQueue.isEmpty()) {
+        if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) {
             scheduleFlush();
         } else {
             LOG.trace("Queue is empty, no flush needed");
@@ -393,21 +432,30 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        long entries = 0;
-        LOG.debug("Channel shutdown, flushing queue...");
-        handler.onConnectionQueueChanged(null);
+        LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
 
-        final Throwable cause = new RejectedExecutionException("Channel disconnected");
-        for (OutboundQueueImpl queue : activeQueues) {
-            entries += queue.failAll(cause);
-        }
-        activeQueues.clear();
-
-        LOG.debug("Flushed {} queue entries", entries);
+        /*
+         * We are dealing with a multi-threaded shutdown, as the user may still
+         * be reserving entries in the queue. We are executing in a netty thread,
+         * so neither flush nor barrier can be running, which is good news.
+         *
+         * We will eat up all the slots in the queue here and mark the offset first
+         * reserved offset and free up all the cached queues. We then schedule
+         * the flush task, which will deal with the rest of the shutdown process.
+         */
+        shutdownOffset = currentQueue.startShutdown();
+        queueCache.clear();
+        LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
+        scheduleFlush();
     }
 
     @Override
     public String toString() {
         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
     }
+
+    void onEchoRequest(final EchoRequestMessage message) {
+        final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
+        parent.getChannel().writeAndFlush(reply);
+    }
 }