BUG-3219: clear queue array before stashing
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
index 4864b95d655d8e63a2dc9851a364f060eadee1d5..f2a4b0e551b9e84162aad8118a2651e11984a5d5 100644 (file)
@@ -15,11 +15,14 @@ import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +65,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private boolean barrierTimerEnabled;
     private int nonBarrierMessages;
     private long lastXid = 0;
+    private Integer shutdownOffset;
 
     // Passed to executor to request triggering of flush
     private final Runnable flushRunnable = new Runnable() {
@@ -70,6 +74,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             flush();
         }
     };
+
+    // Passed to executor to request a periodic barrier check
     private final Runnable barrierRunnable = new Runnable() {
         @Override
         public void run() {
@@ -103,6 +109,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     private void retireQueue(final OutboundQueueImpl queue) {
         if (queueCache.offer(queue)) {
+            queue.retire();
             LOG.trace("Saving queue {} for later reuse", queue);
         } else {
             LOG.trace("Queue {} thrown away", queue);
@@ -249,15 +256,11 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     }
 
     private void scheduleFlush() {
-        if (parent.getChannel().isWritable()) {
-            if (flushScheduled.compareAndSet(false, true)) {
-                LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
-                parent.getChannel().eventLoop().execute(flushRunnable);
-            } else {
-                LOG.trace("Flush task is already present on channel {}", parent.getChannel());
-            }
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+            parent.getChannel().eventLoop().execute(flushRunnable);
         } else {
-            LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+            LOG.trace("Flush task is already present on channel {}", parent.getChannel());
         }
     }
 
@@ -272,7 +275,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     protected void barrier() {
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
         barrierTimerEnabled = false;
-        if (currentQueue == null) {
+        if (shutdownOffset != null) {
             LOG.trace("Channel shut down, not processing barrier");
             return;
         }
@@ -290,10 +293,61 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         }
     }
 
+    private void rescheduleFlush() {
+        /*
+         * We are almost ready to terminate. This is a bit tricky, because
+         * we do not want to have a race window where a message would be
+         * stuck on the queue without a flush being scheduled.
+         *
+         * So we mark ourselves as not running and then re-check if a
+         * flush out is needed. That will re-synchronized with other threads
+         * such that only one flush is scheduled at any given time.
+         */
+        if (!flushScheduled.compareAndSet(true, false)) {
+            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+        }
+
+        conditionalFlush();
+    }
+
+    private void shutdownFlush() {
+        long entries = 0;
+
+        // Fail all queues
+        final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+        while (it.hasNext()) {
+            final OutboundQueueImpl queue = it.next();
+
+            entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+            if (queue.isFinished()) {
+                LOG.trace("Cleared queue {}", queue);
+                it.remove();
+            }
+        }
+
+        LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+
+        Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
+        if (currentQueue.isShutdown(shutdownOffset)) {
+            currentQueue = null;
+            handler.onConnectionQueueChanged(null);
+            LOG.debug("Channel {} shutdown complete", parent.getChannel());
+        } else {
+            LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+            rescheduleFlush();
+        }
+    }
+
     /**
      * Perform a single flush operation.
      */
     protected void flush() {
+        // If the channel is gone, just flush whatever is not completed
+        if (shutdownOffset != null) {
+            shutdownFlush();
+            return;
+        }
+
         final long start = System.nanoTime();
         final long deadline = start + maxWorkTime;
 
@@ -343,29 +397,15 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         LOG.debug("Flushed {} messages in {}us to channel {}",
                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
 
-        /*
-         * We are almost ready to terminate. This is a bit tricky, because
-         * we do not want to have a race window where a message would be
-         * stuck on the queue without a flush being scheduled.
-         *
-         * So we mark ourselves as not running and then re-check if a
-         * flush out is needed. That will re-synchronized with other threads
-         * such that only one flush is scheduled at any given time.
-         */
-        if (!flushScheduled.compareAndSet(true, false)) {
-            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
-        }
-
-        conditionalFlush();
+        rescheduleFlush();
     }
 
-
     /**
      * Schedule a queue flush if it is not empty and the channel is found
      * to be writable. May only be called from Netty context.
      */
     private void conditionalFlush() {
-        if (!currentQueue.isEmpty()) {
+        if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) {
             scheduleFlush();
         } else {
             LOG.trace("Queue is empty, no flush needed");
@@ -393,21 +433,30 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        long entries = 0;
-        LOG.debug("Channel shutdown, flushing queue...");
-        handler.onConnectionQueueChanged(null);
+        LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
 
-        final Throwable cause = new RejectedExecutionException("Channel disconnected");
-        for (OutboundQueueImpl queue : activeQueues) {
-            entries += queue.failAll(cause);
-        }
-        activeQueues.clear();
-
-        LOG.debug("Flushed {} queue entries", entries);
+        /*
+         * We are dealing with a multi-threaded shutdown, as the user may still
+         * be reserving entries in the queue. We are executing in a netty thread,
+         * so neither flush nor barrier can be running, which is good news.
+         *
+         * We will eat up all the slots in the queue here and mark the offset first
+         * reserved offset and free up all the cached queues. We then schedule
+         * the flush task, which will deal with the rest of the shutdown process.
+         */
+        shutdownOffset = currentQueue.startShutdown();
+        queueCache.clear();
+        LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
+        scheduleFlush();
     }
 
     @Override
     public String toString() {
         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
     }
+
+    void onEchoRequest(final EchoRequestMessage message) {
+        final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
+        parent.getChannel().writeAndFlush(reply);
+    }
 }