Flush all entries on channel shutdown 36/7636/1
authorRobert Varga <rovarga@cisco.com>
Tue, 3 Jun 2014 09:16:00 +0000 (11:16 +0200)
committerRobert Varga <rovarga@cisco.com>
Tue, 3 Jun 2014 11:01:33 +0000 (13:01 +0200)
This just makes sure we flush out outstanding queue quickly.

Change-Id: Ib1f7262545f012fe8c418cbe980cc63467a4f654
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ChannelOutboundQueue.java

index a3c038f240a36f40aac0aebf678c5c6261921b48..d84cfc61a1272ea329cb00216e82a4ebb1522b47 100644 (file)
@@ -18,6 +18,7 @@ import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -241,6 +242,26 @@ final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
         conditionalFlush(ctx);
     }
 
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+        super.channelInactive(ctx);
+
+        long entries = 0;
+        LOG.debug("Channel shutdown, flushing queue...");
+        final Future<Void> result = ctx.newFailedFuture(new RejectedExecutionException("Channel disconnected"));
+        while (true) {
+            final MessageHolder<?> e = queue.poll();
+            if (e == null) {
+                break;
+            }
+
+            e.takeListener().operationComplete(result);
+            entries++;
+        }
+
+        LOG.debug("Flushed {} queue entries", entries);
+    }
+
     @Override
     public String toString() {
         return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);