BUG-3219: Fix OutboundQueue cleanup on channel failure 17/20617/5
authorRobert Varga <rovarga@cisco.com>
Sun, 17 May 2015 01:11:00 +0000 (03:11 +0200)
committerRobert Varga <rovarga@cisco.com>
Mon, 18 May 2015 01:12:37 +0000 (03:12 +0200)
When the channel goes inactive, we still need to make sure that any
entries that were reserved and not committed get flushed. Instead of
perfoming a one-shot cleanup in channelInactive(), perform cleanup
whenever flush() runs.

When channel goes inactive, we just cleanup the obviously-freeable
resources and ensure that a flush is scheduled.

Change-Id: I48e1ceb51dcfafedb7352db5d952e9749cdfa50d
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java

index c0c2f764f3aa88c9b12da74ed5878f298d74fa0b..8b498579eb79cdc99a786b0fb92f7ff147b9542f 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
@@ -76,7 +77,7 @@ final class OutboundQueueEntry {
         return reallyComplete;
     }
 
-    void fail(final Throwable cause) {
+    void fail(final OutboundQueueException cause) {
         if (!completed) {
             completed = true;
             if (callback != null) {
index f61e8fffa6d86abb58fa1028e425073e01796482..6713d5ceab000eb5df956833381dfd4af640674e 100644 (file)
@@ -277,6 +277,10 @@ final class OutboundQueueImpl implements OutboundQueue {
         int ret = 0;
         for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
             final OutboundQueueEntry entry = queue[i];
+            if (!entry.isCommitted()) {
+                break;
+            }
+
             if (!entry.isCompleted()) {
                 entry.fail(cause);
                 ret++;
index 5c9a3515cea1de1a0d28a7d29c2d3c646dd9996c..5d36859d6299a92dd83e7646b8ec4279f25c9bb7 100644 (file)
@@ -70,6 +70,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             flush();
         }
     };
+
+    // Passed to executor to request a periodic barrier check
     private final Runnable barrierRunnable = new Runnable() {
         @Override
         public void run() {
@@ -249,15 +251,11 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     }
 
     private void scheduleFlush() {
-        if (parent.getChannel().isWritable()) {
-            if (flushScheduled.compareAndSet(false, true)) {
-                LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
-                parent.getChannel().eventLoop().execute(flushRunnable);
-            } else {
-                LOG.trace("Flush task is already present on channel {}", parent.getChannel());
-            }
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+            parent.getChannel().eventLoop().execute(flushRunnable);
         } else {
-            LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+            LOG.trace("Flush task is already present on channel {}", parent.getChannel());
         }
     }
 
@@ -294,6 +292,24 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      * Perform a single flush operation.
      */
     protected void flush() {
+        // If the channel is gone, just flush whatever is not completed
+        if (currentQueue == null) {
+            long entries = 0;
+
+            final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+            while (it.hasNext()) {
+                final OutboundQueueImpl queue = it.next();
+                entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+                if (queue.isFinished()) {
+                    LOG.trace("Cleared queue {}", queue);
+                    it.remove();
+                }
+            }
+
+            LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+            return;
+        }
+
         final long start = System.nanoTime();
         final long deadline = start + maxWorkTime;
 
@@ -359,7 +375,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         conditionalFlush();
     }
 
-
     /**
      * Schedule a queue flush if it is not empty and the channel is found
      * to be writable. May only be called from Netty context.
@@ -393,16 +408,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        long entries = 0;
-        LOG.debug("Channel shutdown, flushing queue...");
+        LOG.debug("Channel {} shutdown, flushing queue...", parent.getChannel());
         handler.onConnectionQueueChanged(null);
+        currentQueue = null;
+        queueCache.clear();
 
-        for (OutboundQueueImpl queue : activeQueues) {
-            entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
-        }
-        activeQueues.clear();
-
-        LOG.debug("Flushed {} queue entries", entries);
+        scheduleFlush();
     }
 
     @Override