From aa13d46568a13f9d73f9ea90b2c14211b473dde6 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 17 May 2015 03:11:00 +0200 Subject: [PATCH] BUG-3219: Fix OutboundQueue cleanup on channel failure When the channel goes inactive, we still need to make sure that any entries that were reserved and not committed get flushed. Instead of perfoming a one-shot cleanup in channelInactive(), perform cleanup whenever flush() runs. When channel goes inactive, we just cleanup the obviously-freeable resources and ensure that a flush is scheduled. Change-Id: I48e1ceb51dcfafedb7352db5d952e9749cdfa50d Signed-off-by: Robert Varga (cherry picked from commit 44b028491689d8d89c2ffea2e7bc6bb2d80209fe) --- .../core/connection/OutboundQueueEntry.java | 3 +- .../core/connection/OutboundQueueImpl.java | 4 ++ .../core/connection/OutboundQueueManager.java | 45 ++++++++++++------- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java index c0c2f764..8b498579 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; @@ -76,7 +77,7 @@ final class OutboundQueueEntry { return reallyComplete; } - void fail(final Throwable cause) { + void fail(final OutboundQueueException cause) { if (!completed) { completed = true; if (callback != null) { diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java index f61e8fff..6713d5ce 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java @@ -277,6 +277,10 @@ final class OutboundQueueImpl implements OutboundQueue { int ret = 0; for (int i = lastBarrierOffset + 1; i < queue.length; ++i) { final OutboundQueueEntry entry = queue[i]; + if (!entry.isCommitted()) { + break; + } + if (!entry.isCompleted()) { entry.fail(cause); ret++; diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java index 5c9a3515..5d36859d 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -70,6 +70,8 @@ final class OutboundQueueManager extends Channel flush(); } }; + + // Passed to executor to request a periodic barrier check private final Runnable barrierRunnable = new Runnable() { @Override public void run() { @@ -249,15 +251,11 @@ final class OutboundQueueManager extends Channel } private void scheduleFlush() { - if (parent.getChannel().isWritable()) { - if (flushScheduled.compareAndSet(false, true)) { - LOG.trace("Scheduling flush task on channel {}", parent.getChannel()); - parent.getChannel().eventLoop().execute(flushRunnable); - } else { - LOG.trace("Flush task is already present on channel {}", parent.getChannel()); - } + if (flushScheduled.compareAndSet(false, true)) { + LOG.trace("Scheduling flush task on channel {}", parent.getChannel()); + parent.getChannel().eventLoop().execute(flushRunnable); } else { - LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel()); + LOG.trace("Flush task is already present on channel {}", parent.getChannel()); } } @@ -294,6 +292,24 @@ final class OutboundQueueManager extends Channel * Perform a single flush operation. */ protected void flush() { + // If the channel is gone, just flush whatever is not completed + if (currentQueue == null) { + long entries = 0; + + final Iterator it = activeQueues.iterator(); + while (it.hasNext()) { + final OutboundQueueImpl queue = it.next(); + entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED); + if (queue.isFinished()) { + LOG.trace("Cleared queue {}", queue); + it.remove(); + } + } + + LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel()); + return; + } + final long start = System.nanoTime(); final long deadline = start + maxWorkTime; @@ -359,7 +375,6 @@ final class OutboundQueueManager extends Channel conditionalFlush(); } - /** * Schedule a queue flush if it is not empty and the channel is found * to be writable. May only be called from Netty context. @@ -393,16 +408,12 @@ final class OutboundQueueManager extends Channel public void channelInactive(final ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - long entries = 0; - LOG.debug("Channel shutdown, flushing queue..."); + LOG.debug("Channel {} shutdown, flushing queue...", parent.getChannel()); handler.onConnectionQueueChanged(null); + currentQueue = null; + queueCache.clear(); - for (OutboundQueueImpl queue : activeQueues) { - entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED); - } - activeQueues.clear(); - - LOG.debug("Flushed {} queue entries", entries); + scheduleFlush(); } @Override -- 2.36.6