Bug 5118 - Unsent messages reported as failed after disconnect 26/35726/1
authorMichal Polkorab <michal.polkorab@pantheon.sk>
Wed, 27 Jan 2016 01:08:19 +0000 (02:08 +0100)
committerThanh Ha <thanh.ha@linuxfoundation.org>
Fri, 4 Mar 2016 04:03:29 +0000 (23:03 -0500)
 - if there were messages delivered to OutboundQueue and disconnect occured,
   unsent messages were not marked as failed, resulting in incorrect / no report
 - channelInactive in AbstractOutboundQueueManager was never triggered as the event
   was consumed in DelegatingInboundHandler

Change-Id: Iaaffbbe14feaec4afbda0a3d6b19699c1ec79e06
Signed-off-by: Michal Polkorab <michal.polkorab@pantheon.sk>
(cherry picked from commit 25677520a3fca1a925a0970efa20d586e1445e6f)

openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PipelineHandlers.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java

index 51ac5482bb1fb95cd5573a370ffce5a4fda9003f..88144d47be163db2bbffb9303a11c8174508867a 100644 (file)
@@ -44,9 +44,9 @@ public enum PipelineHandlers {
      */
     DELEGATING_INBOUND_HANDLER,
     /**
-     * Performs efficient flushing
+     * Performs configurable efficient flushing
      */
-    CHANNEL_OUTBOUNF_QUEUE,
+    CHANNEL_OUTBOUND_QUEUE_MANAGER,
     /**
      * Decodes incoming messages into message frames
      * and filters them based on version supported
index 8febb15865f5ffbe024ff84e4cee44b04e91ae80..fdcc1f3e1826a5502529bccef5a3e9942c349ca8 100644 (file)
@@ -163,14 +163,22 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O ex
 
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+        // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
+        // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
+        // in the queue.
         super.channelInactive(ctx);
 
         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
 
+        // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
+        // indefinitely) and failing not completed entries.
         shuttingDown = true;
         final long entries = currentQueue.startShutdown(ctx.channel());
         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
 
+        // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
+        // when there is more than shutdownOffset messages enqueued in unflushed segments
+        // (AbstractStackedOutboundQueue#finishShutdown()).
         scheduleFlush();
     }
 
index a32c1ad1c4c55981323ba973e1f5aeecb970932b..759633354024f573697de62308fd944715dfbc76 100644 (file)
@@ -260,16 +260,31 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue {
             final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
             shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
 
-            return lockedShutdownFlush();
+            // Fails all uncompleted entries, because they will never be completed due to disconnected channel.
+            return lockedFailSegments(uncompletedSegments.iterator());
         }
     }
 
+    /**
+     * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
+     * and fails all not completed entries (if in final phase)
+     * @return true if in final phase, false if a flush is needed
+     */
     boolean finishShutdown() {
+        boolean needsFlush;
         synchronized (unflushedSegments) {
-            lockedShutdownFlush();
+            // Fails all entries, that were flushed in shutdownOffset (became uncompleted)
+            // - they will never be completed due to disconnected channel.
+            lockedFailSegments(uncompletedSegments.iterator());
+            // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
+            // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
+            // by this time.
+            needsFlush = needsFlush();
+            if (!needsFlush) {
+                lockedFailSegments(unflushedSegments.iterator());
+            }
         }
-
-        return !needsFlush();
+        return !needsFlush;
     }
 
     protected OutboundQueueEntry getEntry(final Long xid) {
@@ -302,22 +317,27 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue {
         return fastSegment.getEntry(fastOffset);
     }
 
+    /**
+     * Fails not completed entries in segments and frees completed segments
+     * @param iterator list of segments to be failed
+     * @return number of failed entries
+     */
     @GuardedBy("unflushedSegments")
-    private long lockedShutdownFlush() {
+    private long lockedFailSegments(Iterator<StackedSegment> iterator) {
         long entries = 0;
 
         // Fail all queues
-        final Iterator<StackedSegment> it = uncompletedSegments.iterator();
-        while (it.hasNext()) {
-            final StackedSegment segment = it.next();
+        while (iterator.hasNext()) {
+            final StackedSegment segment = iterator.next();
 
             entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
             if (segment.isComplete()) {
                 LOG.trace("Cleared segment {}", segment);
-                it.remove();
+                iterator.remove();
             }
         }
 
         return entries;
     }
+
 }
index 37635c033122dbb96a1d67500665ad5806f8b855..7c10be16208c60cc6234336a2bffaa489703bd93 100644 (file)
@@ -203,7 +203,11 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
         outputManager = ret;
         /* we don't need it anymore */
         channel.pipeline().remove(output);
-        channel.pipeline().addLast(outputManager);
+        // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
+        // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
+        // cause problems because we are shutting down the queue before Openflowplugin knows about it.
+        channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
+                PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
 
         return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
             @Override