2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import java.net.InetSocketAddress;
12 import java.util.concurrent.TimeUnit;
13 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
14 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
19 final class OutboundQueueManager<T extends OutboundQueueHandler> extends
20 AbstractOutboundQueueManager<T, StackedOutboundQueue> {
21 private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
23 private final int maxNonBarrierMessages;
24 private final long maxBarrierNanos;
26 // Updated from netty only
27 private boolean barrierTimerEnabled;
28 private long lastBarrierNanos = System.nanoTime();
29 private int nonBarrierMessages;
31 // Passed to executor to request a periodic barrier check
32 private final Runnable barrierRunnable = new Runnable() {
39 OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
40 final int maxNonBarrierMessages, final long maxBarrierNanos) {
41 super(parent, address, handler);
42 Preconditions.checkArgument(maxNonBarrierMessages > 0);
43 this.maxNonBarrierMessages = maxNonBarrierMessages;
44 Preconditions.checkArgument(maxBarrierNanos > 0);
45 this.maxBarrierNanos = maxBarrierNanos;
49 protected StackedOutboundQueue initializeStackedOutboudnqueue() {
50 return new StackedOutboundQueue(this);
53 private void scheduleBarrierTimer(final long now) {
54 long next = lastBarrierNanos + maxBarrierNanos;
56 LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
57 next = now + maxBarrierNanos;
60 final long delay = next - now;
61 LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
62 parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
63 barrierTimerEnabled = true;
66 private void scheduleBarrierMessage() {
67 final Long xid = currentQueue.reserveBarrierIfNeeded();
69 LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
73 currentQueue.commitEntry(xid, getHandler().createBarrierRequest(xid), null);
74 LOG.trace("Barrier XID {} scheduled", xid);
79 * Periodic barrier check.
81 protected void barrier() {
82 LOG.debug("Channel {} barrier timer expired", parent.getChannel());
83 barrierTimerEnabled = false;
85 LOG.trace("Channel shut down, not processing barrier");
89 final long now = System.nanoTime();
90 final long sinceLast = now - lastBarrierNanos;
91 if (sinceLast >= maxBarrierNanos) {
92 LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
93 // FIXME: we should be tracking requests/responses instead of this
94 if (nonBarrierMessages == 0) {
95 LOG.trace("No messages written since last barrier, not issuing one");
97 scheduleBarrierMessage();
103 * Write a message into the underlying channel.
105 * @param now Time reference for 'now'. We take this as an argument, as
106 * we need a timestamp to mark barrier messages we see swinging
107 * by. That timestamp does not need to be completely accurate,
108 * hence we use the flush start time. Alternative would be to
109 * measure System.nanoTime() for each barrier -- needlessly
113 void writeMessage(final OfHeader message, final long now) {
114 super.writeMessage(message, now);
115 if (message instanceof BarrierInput) {
116 LOG.trace("Barrier message seen, resetting counters");
117 nonBarrierMessages = 0;
118 lastBarrierNanos = now;
120 nonBarrierMessages++;
121 if (nonBarrierMessages >= maxNonBarrierMessages) {
122 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
123 scheduleBarrierMessage();
124 } else if (!barrierTimerEnabled) {
125 scheduleBarrierTimer(now);