@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
+ // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
+ // in the queue.
super.channelInactive(ctx);
LOG.debug("Channel {} initiating shutdown...", ctx.channel());
+ // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
+ // indefinitely) and failing not completed entries.
shuttingDown = true;
final long entries = currentQueue.startShutdown(ctx.channel());
LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
+ // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
+ // when there is more than shutdownOffset messages enqueued in unflushed segments
+ // (AbstractStackedOutboundQueue#finishShutdown()).
scheduleFlush();
}
final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
- return lockedShutdownFlush();
+ // Fails all uncompleted entries, because they will never be completed due to disconnected channel.
+ return lockedFailSegments(uncompletedSegments.iterator());
}
}
+ /**
+ * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
+ * and fails all not completed entries (if in final phase)
+ * @return true if in final phase, false if a flush is needed
+ */
boolean finishShutdown() {
+ boolean needsFlush;
synchronized (unflushedSegments) {
- lockedShutdownFlush();
+ // Fails all entries, that were flushed in shutdownOffset (became uncompleted)
+ // - they will never be completed due to disconnected channel.
+ lockedFailSegments(uncompletedSegments.iterator());
+ // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
+ // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
+ // by this time.
+ needsFlush = needsFlush();
+ if (!needsFlush) {
+ lockedFailSegments(unflushedSegments.iterator());
+ }
}
-
- return !needsFlush();
+ return !needsFlush;
}
protected OutboundQueueEntry getEntry(final Long xid) {
return fastSegment.getEntry(fastOffset);
}
+ /**
+ * Fails not completed entries in segments and frees completed segments
+ * @param iterator list of segments to be failed
+ * @return number of failed entries
+ */
@GuardedBy("unflushedSegments")
- private long lockedShutdownFlush() {
+ private long lockedFailSegments(Iterator<StackedSegment> iterator) {
long entries = 0;
// Fail all queues
- final Iterator<StackedSegment> it = uncompletedSegments.iterator();
- while (it.hasNext()) {
- final StackedSegment segment = it.next();
+ while (iterator.hasNext()) {
+ final StackedSegment segment = iterator.next();
entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
if (segment.isComplete()) {
LOG.trace("Cleared segment {}", segment);
- it.remove();
+ iterator.remove();
}
}
return entries;
}
+
}
outputManager = ret;
/* we don't need it anymore */
channel.pipeline().remove(output);
- channel.pipeline().addLast(outputManager);
+ // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
+ // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
+ // cause problems because we are shutting down the queue before Openflowplugin knows about it.
+ channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
+ PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
@Override