import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private boolean barrierTimerEnabled;
private int nonBarrierMessages;
private long lastXid = 0;
+ private Integer shutdownOffset;
// Passed to executor to request triggering of flush
private final Runnable flushRunnable = new Runnable() {
flush();
}
};
+
+ // Passed to executor to request a periodic barrier check
private final Runnable barrierRunnable = new Runnable() {
@Override
public void run() {
private void retireQueue(final OutboundQueueImpl queue) {
if (queueCache.offer(queue)) {
+ queue.retire();
LOG.trace("Saving queue {} for later reuse", queue);
} else {
LOG.trace("Queue {} thrown away", queue);
}
private void scheduleFlush() {
- if (parent.getChannel().isWritable()) {
- if (flushScheduled.compareAndSet(false, true)) {
- LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
- parent.getChannel().eventLoop().execute(flushRunnable);
- } else {
- LOG.trace("Flush task is already present on channel {}", parent.getChannel());
- }
+ if (flushScheduled.compareAndSet(false, true)) {
+ LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+ parent.getChannel().eventLoop().execute(flushRunnable);
} else {
- LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+ LOG.trace("Flush task is already present on channel {}", parent.getChannel());
}
}
protected void barrier() {
LOG.debug("Channel {} barrier timer expired", parent.getChannel());
barrierTimerEnabled = false;
- if (currentQueue == null) {
+ if (shutdownOffset != null) {
LOG.trace("Channel shut down, not processing barrier");
return;
}
}
}
+ private void rescheduleFlush() {
+ /*
+ * We are almost ready to terminate. This is a bit tricky, because
+ * we do not want to have a race window where a message would be
+ * stuck on the queue without a flush being scheduled.
+ *
+ * So we mark ourselves as not running and then re-check if a
+ * flush out is needed. That will re-synchronized with other threads
+ * such that only one flush is scheduled at any given time.
+ */
+ if (!flushScheduled.compareAndSet(true, false)) {
+ LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+ }
+
+ conditionalFlush();
+ }
+
+ private void shutdownFlush() {
+ long entries = 0;
+
+ // Fail all queues
+ final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+ while (it.hasNext()) {
+ final OutboundQueueImpl queue = it.next();
+
+ entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+ if (queue.isFinished()) {
+ LOG.trace("Cleared queue {}", queue);
+ it.remove();
+ }
+ }
+
+ LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+
+ Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
+ if (currentQueue.isShutdown(shutdownOffset)) {
+ currentQueue = null;
+ handler.onConnectionQueueChanged(null);
+ LOG.debug("Channel {} shutdown complete", parent.getChannel());
+ } else {
+ LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+ rescheduleFlush();
+ }
+ }
+
/**
* Perform a single flush operation.
*/
protected void flush() {
+ // If the channel is gone, just flush whatever is not completed
+ if (shutdownOffset != null) {
+ shutdownFlush();
+ return;
+ }
+
final long start = System.nanoTime();
final long deadline = start + maxWorkTime;
LOG.debug("Flushed {} messages in {}us to channel {}",
messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
- /*
- * We are almost ready to terminate. This is a bit tricky, because
- * we do not want to have a race window where a message would be
- * stuck on the queue without a flush being scheduled.
- *
- * So we mark ourselves as not running and then re-check if a
- * flush out is needed. That will re-synchronized with other threads
- * such that only one flush is scheduled at any given time.
- */
- if (!flushScheduled.compareAndSet(true, false)) {
- LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
- }
-
- conditionalFlush();
+ rescheduleFlush();
}
-
/**
* Schedule a queue flush if it is not empty and the channel is found
* to be writable. May only be called from Netty context.
*/
private void conditionalFlush() {
- if (!currentQueue.isEmpty()) {
+ if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) {
scheduleFlush();
} else {
LOG.trace("Queue is empty, no flush needed");
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
- long entries = 0;
- LOG.debug("Channel shutdown, flushing queue...");
- handler.onConnectionQueueChanged(null);
-
- for (OutboundQueueImpl queue : activeQueues) {
- entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
- }
- activeQueues.clear();
+ LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
- LOG.debug("Flushed {} queue entries", entries);
+ /*
+ * We are dealing with a multi-threaded shutdown, as the user may still
+ * be reserving entries in the queue. We are executing in a netty thread,
+ * so neither flush nor barrier can be running, which is good news.
+ *
+ * We will eat up all the slots in the queue here and mark the offset first
+ * reserved offset and free up all the cached queues. We then schedule
+ * the flush task, which will deal with the rest of the shutdown process.
+ */
+ shutdownOffset = currentQueue.startShutdown();
+ queueCache.clear();
+ LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
+ scheduleFlush();
}
@Override
public String toString() {
return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
}
+
+ void onEchoRequest(final EchoRequestMessage message) {
+ final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
+ parent.getChannel().writeAndFlush(reply);
+ }
}