Revert "Bug 5118 - Unsent messages reported as failed after disconnect" 23/35723/1
authorThanh Ha <thanh.ha@linuxfoundation.org>
Fri, 4 Mar 2016 01:55:06 +0000 (20:55 -0500)
committerThanh Ha <thanh.ha@linuxfoundation.org>
Fri, 4 Mar 2016 01:56:03 +0000 (20:56 -0500)
This reverts commit 42d77fc7c0b4fc944b270a29f15ae600fc494cb2.

Change-Id: Iac6ecd2af999ccae49afc8f9d88a68431f0045b5
Signed-off-by: Thanh Ha <thanh.ha@linuxfoundation.org>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PipelineHandlers.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java

index 88144d47be163db2bbffb9303a11c8174508867a..51ac5482bb1fb95cd5573a370ffce5a4fda9003f 100644 (file)
@@ -44,9 +44,9 @@ public enum PipelineHandlers {
      */
     DELEGATING_INBOUND_HANDLER,
     /**
-     * Performs configurable efficient flushing
+     * Performs efficient flushing
      */
-    CHANNEL_OUTBOUND_QUEUE_MANAGER,
+    CHANNEL_OUTBOUNF_QUEUE,
     /**
      * Decodes incoming messages into message frames
      * and filters them based on version supported
index fdcc1f3e1826a5502529bccef5a3e9942c349ca8..8febb15865f5ffbe024ff84e4cee44b04e91ae80 100644 (file)
@@ -163,22 +163,14 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O ex
 
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
-        // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
-        // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
-        // in the queue.
         super.channelInactive(ctx);
 
         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
 
-        // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
-        // indefinitely) and failing not completed entries.
         shuttingDown = true;
         final long entries = currentQueue.startShutdown(ctx.channel());
         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
 
-        // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
-        // when there is more than shutdownOffset messages enqueued in unflushed segments
-        // (AbstractStackedOutboundQueue#finishShutdown()).
         scheduleFlush();
     }
 
index 759633354024f573697de62308fd944715dfbc76..a32c1ad1c4c55981323ba973e1f5aeecb970932b 100644 (file)
@@ -260,31 +260,16 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue {
             final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
             shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
 
-            // Fails all uncompleted entries, because they will never be completed due to disconnected channel.
-            return lockedFailSegments(uncompletedSegments.iterator());
+            return lockedShutdownFlush();
         }
     }
 
-    /**
-     * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
-     * and fails all not completed entries (if in final phase)
-     * @return true if in final phase, false if a flush is needed
-     */
     boolean finishShutdown() {
-        boolean needsFlush;
         synchronized (unflushedSegments) {
-            // Fails all entries, that were flushed in shutdownOffset (became uncompleted)
-            // - they will never be completed due to disconnected channel.
-            lockedFailSegments(uncompletedSegments.iterator());
-            // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
-            // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
-            // by this time.
-            needsFlush = needsFlush();
-            if (!needsFlush) {
-                lockedFailSegments(unflushedSegments.iterator());
-            }
+            lockedShutdownFlush();
         }
-        return !needsFlush;
+
+        return !needsFlush();
     }
 
     protected OutboundQueueEntry getEntry(final Long xid) {
@@ -317,27 +302,22 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue {
         return fastSegment.getEntry(fastOffset);
     }
 
-    /**
-     * Fails not completed entries in segments and frees completed segments
-     * @param iterator list of segments to be failed
-     * @return number of failed entries
-     */
     @GuardedBy("unflushedSegments")
-    private long lockedFailSegments(Iterator<StackedSegment> iterator) {
+    private long lockedShutdownFlush() {
         long entries = 0;
 
         // Fail all queues
-        while (iterator.hasNext()) {
-            final StackedSegment segment = iterator.next();
+        final Iterator<StackedSegment> it = uncompletedSegments.iterator();
+        while (it.hasNext()) {
+            final StackedSegment segment = it.next();
 
             entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
             if (segment.isComplete()) {
                 LOG.trace("Cleared segment {}", segment);
-                iterator.remove();
+                it.remove();
             }
         }
 
         return entries;
     }
-
 }
index 7c10be16208c60cc6234336a2bffaa489703bd93..37635c033122dbb96a1d67500665ad5806f8b855 100644 (file)
@@ -203,11 +203,7 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
         outputManager = ret;
         /* we don't need it anymore */
         channel.pipeline().remove(output);
-        // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
-        // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
-        // cause problems because we are shutting down the queue before Openflowplugin knows about it.
-        channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
-                PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
+        channel.pipeline().addLast(outputManager);
 
         return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
             @Override