import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private boolean barrierTimerEnabled;
private int nonBarrierMessages;
private long lastXid = 0;
+ private Integer shutdownOffset;
// Passed to executor to request triggering of flush
private final Runnable flushRunnable = new Runnable() {
flush();
}
};
+
+ // Passed to executor to request a periodic barrier check
private final Runnable barrierRunnable = new Runnable() {
@Override
public void run() {
private void retireQueue(final OutboundQueueImpl queue) {
if (queueCache.offer(queue)) {
- LOG.debug("Saving queue {} for later reuse", queue);
+ LOG.trace("Saving queue {} for later reuse", queue);
} else {
- LOG.debug("Queue {} thrown away", queue);
+ LOG.trace("Queue {} thrown away", queue);
}
}
final OutboundQueueImpl queue;
if (cached != null) {
queue = cached.reuse(baseXid);
- LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
+ LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
} else {
queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
- LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel());
+ LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
}
activeQueues.add(queue);
private void scheduleBarrierTimer(final long now) {
long next = lastBarrierNanos + maxBarrierNanos;
if (next < now) {
- LOG.debug("Attempted to schedule barrier in the past, reset maximum)");
+ LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
next = now + maxBarrierNanos;
}
final long delay = next - now;
- LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
+ LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
barrierTimerEnabled = true;
}
private void scheduleBarrierMessage() {
final Long xid = currentQueue.reserveBarrierIfNeeded();
if (xid == null) {
- LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue);
+ LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
return;
}
currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
- LOG.debug("Barrier XID {} scheduled", xid);
+ LOG.trace("Barrier XID {} scheduled", xid);
}
/**
}
if (message instanceof BarrierInput) {
- LOG.debug("Barrier message seen, resetting counters");
+ LOG.trace("Barrier message seen, resetting counters");
nonBarrierMessages = 0;
lastBarrierNanos = now;
} else {
nonBarrierMessages++;
if (nonBarrierMessages >= queueSize) {
- LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+ LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
scheduleBarrierMessage();
} else if (!barrierTimerEnabled) {
scheduleBarrierTimer(now);
* @return True if the message matched a previous request, false otherwise.
*/
boolean onMessage(final OfHeader message) {
- LOG.debug("Attempting to pair message {} to a request", message);
+ LOG.trace("Attempting to pair message {} to a request", message);
Iterator<OutboundQueueImpl> it = activeQueues.iterator();
while (it.hasNext()) {
continue;
}
- LOG.debug("Queue {} accepted response {}", queue, message);
+ LOG.trace("Queue {} accepted response {}", queue, message);
// This has been a barrier request, we need to flush all
// previous queues
if (entry.isBarrier() && activeQueues.size() > 1) {
- LOG.debug("Queue {} indicated request was a barrier", queue);
+ LOG.trace("Queue {} indicated request was a barrier", queue);
it = activeQueues.iterator();
while (it.hasNext()) {
// We want to complete all queues before the current one, we will
// complete the current queue below
if (!queue.equals(q)) {
- LOG.debug("Queue {} is implied finished", q);
+ LOG.trace("Queue {} is implied finished", q);
q.completeAll();
it.remove();
retireQueue(q);
}
if (queue.isFinished()) {
- LOG.debug("Queue {} is finished", queue);
+ LOG.trace("Queue {} is finished", queue);
it.remove();
retireQueue(queue);
}
}
private void scheduleFlush() {
- if (parent.getChannel().isWritable()) {
- if (flushScheduled.compareAndSet(false, true)) {
- LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
- parent.getChannel().eventLoop().execute(flushRunnable);
- } else {
- LOG.trace("Flush task is already present on channel {}", parent.getChannel());
- }
+ if (flushScheduled.compareAndSet(false, true)) {
+ LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+ parent.getChannel().eventLoop().execute(flushRunnable);
} else {
- LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+ LOG.trace("Flush task is already present on channel {}", parent.getChannel());
}
}
protected void barrier() {
LOG.debug("Channel {} barrier timer expired", parent.getChannel());
barrierTimerEnabled = false;
- if (currentQueue == null) {
- LOG.debug("Channel shut down, not processing barrier");
+ if (shutdownOffset != null) {
+ LOG.trace("Channel shut down, not processing barrier");
return;
}
LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
// FIXME: we should be tracking requests/responses instead of this
if (nonBarrierMessages == 0) {
- LOG.debug("No messages written since last barrier, not issuing one");
+ LOG.trace("No messages written since last barrier, not issuing one");
} else {
scheduleBarrierMessage();
}
}
}
+ private void rescheduleFlush() {
+ /*
+ * We are almost ready to terminate. This is a bit tricky, because
+ * we do not want to have a race window where a message would be
+ * stuck on the queue without a flush being scheduled.
+ *
+ * So we mark ourselves as not running and then re-check if a
+ * flush out is needed. That will re-synchronized with other threads
+ * such that only one flush is scheduled at any given time.
+ */
+ if (!flushScheduled.compareAndSet(true, false)) {
+ LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+ }
+
+ conditionalFlush();
+ }
+
+ private void shutdownFlush() {
+ long entries = 0;
+
+ // Fail all queues
+ final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+ while (it.hasNext()) {
+ final OutboundQueueImpl queue = it.next();
+
+ entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+ if (queue.isFinished()) {
+ LOG.trace("Cleared queue {}", queue);
+ it.remove();
+ }
+ }
+
+ LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+
+ Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
+ if (currentQueue.isShutdown(shutdownOffset)) {
+ currentQueue = null;
+ handler.onConnectionQueueChanged(null);
+ LOG.debug("Channel {} shutdown complete", parent.getChannel());
+ } else {
+ LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+ rescheduleFlush();
+ }
+ }
+
/**
* Perform a single flush operation.
*/
protected void flush() {
+ // If the channel is gone, just flush whatever is not completed
+ if (shutdownOffset != null) {
+ shutdownFlush();
+ return;
+ }
+
final long start = System.nanoTime();
final long deadline = start + maxWorkTime;
LOG.debug("Flushed {} messages in {}us to channel {}",
messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
- /*
- * We are almost ready to terminate. This is a bit tricky, because
- * we do not want to have a race window where a message would be
- * stuck on the queue without a flush being scheduled.
- *
- * So we mark ourselves as not running and then re-check if a
- * flush out is needed. That will re-synchronized with other threads
- * such that only one flush is scheduled at any given time.
- */
- if (!flushScheduled.compareAndSet(true, false)) {
- LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
- }
-
- conditionalFlush();
+ rescheduleFlush();
}
-
/**
* Schedule a queue flush if it is not empty and the channel is found
* to be writable. May only be called from Netty context.
*/
private void conditionalFlush() {
- if (!currentQueue.isEmpty()) {
+ if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) {
scheduleFlush();
} else {
LOG.trace("Queue is empty, no flush needed");
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
- long entries = 0;
- LOG.debug("Channel shutdown, flushing queue...");
- handler.onConnectionQueueChanged(null);
+ LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
- final Throwable cause = new RejectedExecutionException("Channel disconnected");
- for (OutboundQueueImpl queue : activeQueues) {
- entries += queue.failAll(cause);
- }
- activeQueues.clear();
-
- LOG.debug("Flushed {} queue entries", entries);
+ /*
+ * We are dealing with a multi-threaded shutdown, as the user may still
+ * be reserving entries in the queue. We are executing in a netty thread,
+ * so neither flush nor barrier can be running, which is good news.
+ *
+ * We will eat up all the slots in the queue here and mark the offset first
+ * reserved offset and free up all the cached queues. We then schedule
+ * the flush task, which will deal with the rest of the shutdown process.
+ */
+ shutdownOffset = currentQueue.startShutdown();
+ queueCache.clear();
+ LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
+ scheduleFlush();
}
@Override
public String toString() {
return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
}
+
+ void onEchoRequest(final EchoRequestMessage message) {
+ final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
+ parent.getChannel().writeAndFlush(reply);
+ }
}