Fix for the Bug 5637 : When closing OutboundQueue close() should go prior to finishSh...
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractOutboundQueueManager.java
index 99bec867e582ae60c2996c67a483ac014c53b679..64063943b5dfa8c192df08e3a863ce2d2c758714 100644 (file)
@@ -143,10 +143,11 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O ex
         // we'll steal its work. Note that more work may accumulate in the time window
         // between now and when the task will run, so it may not be a no-op after all.
         //
-        // The reason for this is to will the output buffer before we go into selection
+        // The reason for this is to fill the output buffer before we go into selection
         // phase. This will make sure the pipe is full (in which case our next wake up
         // will be the queue becoming writable).
         writeAndFlush();
+        alreadyReading = false;
     }
 
     @Override
@@ -162,14 +163,22 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O ex
 
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+        // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
+        // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
+        // in the queue.
         super.channelInactive(ctx);
 
         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
 
+        // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
+        // indefinitely) and failing not completed entries.
         shuttingDown = true;
         final long entries = currentQueue.startShutdown(ctx.channel());
         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
 
+        // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
+        // when there is more than shutdownOffset messages enqueued in unflushed segments
+        // (AbstractStackedOutboundQueue#finishShutdown()).
         scheduleFlush();
     }
 
@@ -258,7 +267,7 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O ex
      *
      * @return
      */
-    private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
+    protected Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
         Preconditions.checkArgument(msg != null);
 
         if (address == null) {
@@ -290,12 +299,14 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O ex
             LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
             writeAndFlush();
             rescheduleFlush();
-        } else if (currentQueue.finishShutdown()) {
-            close();
-            LOG.debug("Channel {} shutdown complete", parent.getChannel());
         } else {
-            LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
-            rescheduleFlush();
+            close();
+            if (currentQueue.finishShutdown()) {
+               LOG.debug("Channel {} shutdown complete", parent.getChannel());
+            } else {
+               LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+               rescheduleFlush();
+            }
         }
     }