Bug 5118 - Unsent messages reported as failed after disconnect
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractOutboundQueueManager.java
index 49c6ddfa9fb47c155796cbdb3d453cd36704c22d..fdcc1f3e1826a5502529bccef5a3e9942c349ca8 100644 (file)
@@ -29,7 +29,8 @@ import org.slf4j.LoggerFactory;
  * Class capsulate basic processing for stacking requests for netty channel
  * and provide functionality for pairing request/response device message communication.
  */
-abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter
+abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
+        extends ChannelInboundHandlerAdapter
         implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
@@ -67,7 +68,7 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
     private final AtomicBoolean flushScheduled = new AtomicBoolean();
     protected final ConnectionAdapterImpl parent;
     protected final InetSocketAddress address;
-    protected final StackedOutboundQueue currentQueue;
+    protected final O currentQueue;
     private final T handler;
 
     // Accessed concurrently
@@ -89,12 +90,20 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
         this.parent = Preconditions.checkNotNull(parent);
         this.handler = Preconditions.checkNotNull(handler);
         this.address = address;
-        currentQueue = new StackedOutboundQueue(this);
+        /* Note: don't wish to use reflection here */
+        currentQueue = initializeStackedOutboudnqueue();
         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
 
         handler.onConnectionQueueChanged(currentQueue);
     }
 
+    /**
+     * Method has to initialize some child of {@link AbstractStackedOutboundQueue}
+     *
+     * @return correct implementation of StacketOutboundqueue
+     */
+    protected abstract O initializeStackedOutboudnqueue();
+
     @Override
     public void close() {
         handler.onConnectionQueueChanged(null);
@@ -134,10 +143,11 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
         // we'll steal its work. Note that more work may accumulate in the time window
         // between now and when the task will run, so it may not be a no-op after all.
         //
-        // The reason for this is to will the output buffer before we go into selection
+        // The reason for this is to fill the output buffer before we go into selection
         // phase. This will make sure the pipe is full (in which case our next wake up
         // will be the queue becoming writable).
         writeAndFlush();
+        alreadyReading = false;
     }
 
     @Override
@@ -153,14 +163,22 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
 
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+        // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
+        // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
+        // in the queue.
         super.channelInactive(ctx);
 
         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
 
+        // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
+        // indefinitely) and failing not completed entries.
         shuttingDown = true;
         final long entries = currentQueue.startShutdown(ctx.channel());
         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
 
+        // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
+        // when there is more than shutdownOffset messages enqueued in unflushed segments
+        // (AbstractStackedOutboundQueue#finishShutdown()).
         scheduleFlush();
     }
 
@@ -249,7 +267,7 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
      *
      * @return
      */
-    private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
+    protected Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
         Preconditions.checkArgument(msg != null);
 
         if (address == null) {