Bug 5118 - Unsent messages reported as failed after disconnect
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractStackedOutboundQueue.java
index a32c1ad1c4c55981323ba973e1f5aeecb970932b..759633354024f573697de62308fd944715dfbc76 100644 (file)
@@ -260,16 +260,31 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue {
             final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
             shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
 
-            return lockedShutdownFlush();
+            // Fails all uncompleted entries, because they will never be completed due to disconnected channel.
+            return lockedFailSegments(uncompletedSegments.iterator());
         }
     }
 
+    /**
+     * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
+     * and fails all not completed entries (if in final phase)
+     * @return true if in final phase, false if a flush is needed
+     */
     boolean finishShutdown() {
+        boolean needsFlush;
         synchronized (unflushedSegments) {
-            lockedShutdownFlush();
+            // Fails all entries, that were flushed in shutdownOffset (became uncompleted)
+            // - they will never be completed due to disconnected channel.
+            lockedFailSegments(uncompletedSegments.iterator());
+            // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
+            // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
+            // by this time.
+            needsFlush = needsFlush();
+            if (!needsFlush) {
+                lockedFailSegments(unflushedSegments.iterator());
+            }
         }
-
-        return !needsFlush();
+        return !needsFlush;
     }
 
     protected OutboundQueueEntry getEntry(final Long xid) {
@@ -302,22 +317,27 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue {
         return fastSegment.getEntry(fastOffset);
     }
 
+    /**
+     * Fails not completed entries in segments and frees completed segments
+     * @param iterator list of segments to be failed
+     * @return number of failed entries
+     */
     @GuardedBy("unflushedSegments")
-    private long lockedShutdownFlush() {
+    private long lockedFailSegments(Iterator<StackedSegment> iterator) {
         long entries = 0;
 
         // Fail all queues
-        final Iterator<StackedSegment> it = uncompletedSegments.iterator();
-        while (it.hasNext()) {
-            final StackedSegment segment = it.next();
+        while (iterator.hasNext()) {
+            final StackedSegment segment = iterator.next();
 
             entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
             if (segment.isComplete()) {
                 LOG.trace("Cleared segment {}", segment);
-                it.remove();
+                iterator.remove();
             }
         }
 
         return entries;
     }
+
 }