From: Michal Polkorab Date: Wed, 27 Jan 2016 01:08:19 +0000 (+0100) Subject: Bug 5118 - Unsent messages reported as failed after disconnect X-Git-Tag: release/boron~19 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F83%2F35683%2F1;p=openflowjava.git Bug 5118 - Unsent messages reported as failed after disconnect - if there were messages delivered to OutboundQueue and disconnect occured, unsent messages were not marked as failed, resulting in incorrect / no report - channelInactive in AbstractOutboundQueueManager was never triggered as the event was consumed in DelegatingInboundHandler Change-Id: Ifd64c8f9346534a934d49a88ddd5c8f71cbb01e7 Signed-off-by: Michal Polkorab (cherry picked from commit 25677520a3fca1a925a0970efa20d586e1445e6f) --- diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PipelineHandlers.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PipelineHandlers.java index 51ac5482..88144d47 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PipelineHandlers.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PipelineHandlers.java @@ -44,9 +44,9 @@ public enum PipelineHandlers { */ DELEGATING_INBOUND_HANDLER, /** - * Performs efficient flushing + * Performs configurable efficient flushing */ - CHANNEL_OUTBOUNF_QUEUE, + CHANNEL_OUTBOUND_QUEUE_MANAGER, /** * Decodes incoming messages into message frames * and filters them based on version supported diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java index 8febb158..fdcc1f3e 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.java @@ -163,14 +163,22 @@ abstract class AbstractOutboundQueueManager OF Plugin -> queue.close() + // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued + // in the queue. super.channelInactive(ctx); LOG.debug("Channel {} initiating shutdown...", ctx.channel()); + // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages + // indefinitely) and failing not completed entries. shuttingDown = true; final long entries = currentQueue.startShutdown(ctx.channel()); LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel()); + // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case, + // when there is more than shutdownOffset messages enqueued in unflushed segments + // (AbstractStackedOutboundQueue#finishShutdown()). scheduleFlush(); } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java index a32c1ad1..75963335 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java @@ -260,16 +260,31 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE); shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE); - return lockedShutdownFlush(); + // Fails all uncompleted entries, because they will never be completed due to disconnected channel. + return lockedFailSegments(uncompletedSegments.iterator()); } } + /** + * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed + * and fails all not completed entries (if in final phase) + * @return true if in final phase, false if a flush is needed + */ boolean finishShutdown() { + boolean needsFlush; synchronized (unflushedSegments) { - lockedShutdownFlush(); + // Fails all entries, that were flushed in shutdownOffset (became uncompleted) + // - they will never be completed due to disconnected channel. + lockedFailSegments(uncompletedSegments.iterator()); + // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry + // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued + // by this time. + needsFlush = needsFlush(); + if (!needsFlush) { + lockedFailSegments(unflushedSegments.iterator()); + } } - - return !needsFlush(); + return !needsFlush; } protected OutboundQueueEntry getEntry(final Long xid) { @@ -302,22 +317,27 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { return fastSegment.getEntry(fastOffset); } + /** + * Fails not completed entries in segments and frees completed segments + * @param iterator list of segments to be failed + * @return number of failed entries + */ @GuardedBy("unflushedSegments") - private long lockedShutdownFlush() { + private long lockedFailSegments(Iterator iterator) { long entries = 0; // Fail all queues - final Iterator it = uncompletedSegments.iterator(); - while (it.hasNext()) { - final StackedSegment segment = it.next(); + while (iterator.hasNext()) { + final StackedSegment segment = iterator.next(); entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED); if (segment.isComplete()) { LOG.trace("Cleared segment {}", segment); - it.remove(); + iterator.remove(); } } return entries; } + } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java index 37635c03..7c10be16 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java @@ -203,7 +203,11 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i outputManager = ret; /* we don't need it anymore */ channel.pipeline().remove(output); - channel.pipeline().addLast(outputManager); + // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would + // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might + // cause problems because we are shutting down the queue before Openflowplugin knows about it. + channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), + PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager); return new OutboundQueueHandlerRegistrationImpl(handler) { @Override