@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
- // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
- // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
- // in the queue.
super.channelInactive(ctx);
LOG.debug("Channel {} initiating shutdown...", ctx.channel());
- // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
- // indefinitely) and failing not completed entries.
shuttingDown = true;
final long entries = currentQueue.startShutdown(ctx.channel());
LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
- // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
- // when there is more than shutdownOffset messages enqueued in unflushed segments
- // (AbstractStackedOutboundQueue#finishShutdown()).
scheduleFlush();
}
final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
- // Fails all uncompleted entries, because they will never be completed due to disconnected channel.
- return lockedFailSegments(uncompletedSegments.iterator());
+ return lockedShutdownFlush();
}
}
- /**
- * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
- * and fails all not completed entries (if in final phase)
- * @return true if in final phase, false if a flush is needed
- */
boolean finishShutdown() {
- boolean needsFlush;
synchronized (unflushedSegments) {
- // Fails all entries, that were flushed in shutdownOffset (became uncompleted)
- // - they will never be completed due to disconnected channel.
- lockedFailSegments(uncompletedSegments.iterator());
- // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
- // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
- // by this time.
- needsFlush = needsFlush();
- if (!needsFlush) {
- lockedFailSegments(unflushedSegments.iterator());
- }
+ lockedShutdownFlush();
}
- return !needsFlush;
+
+ return !needsFlush();
}
protected OutboundQueueEntry getEntry(final Long xid) {
return fastSegment.getEntry(fastOffset);
}
- /**
- * Fails not completed entries in segments and frees completed segments
- * @param iterator list of segments to be failed
- * @return number of failed entries
- */
@GuardedBy("unflushedSegments")
- private long lockedFailSegments(Iterator<StackedSegment> iterator) {
+ private long lockedShutdownFlush() {
long entries = 0;
// Fail all queues
- while (iterator.hasNext()) {
- final StackedSegment segment = iterator.next();
+ final Iterator<StackedSegment> it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment segment = it.next();
entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
if (segment.isComplete()) {
LOG.trace("Cleared segment {}", segment);
- iterator.remove();
+ it.remove();
}
}
return entries;
}
-
}
outputManager = ret;
/* we don't need it anymore */
channel.pipeline().remove(output);
- // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
- // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
- // cause problems because we are shutting down the queue before Openflowplugin knows about it.
- channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
- PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
+ channel.pipeline().addLast(outputManager);
return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
@Override