90db23da65d3dfa37b245edf3ce1b6af5d7fd951
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import java.net.InetSocketAddress;
12 import java.util.concurrent.TimeUnit;
13 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
14 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 final class OutboundQueueManager<T extends OutboundQueueHandler> extends
20         AbstractOutboundQueueManager<T, StackedOutboundQueue> {
21     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
22
23     private final int maxNonBarrierMessages;
24     private final long maxBarrierNanos;
25
26     // Updated from netty only
27     private boolean barrierTimerEnabled;
28     private long lastBarrierNanos = System.nanoTime();
29     private int nonBarrierMessages;
30
31     // Passed to executor to request a periodic barrier check
32     private final Runnable barrierRunnable = new Runnable() {
33         @Override
34         public void run() {
35             barrier();
36         }
37     };
38
39     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
40         final int maxNonBarrierMessages, final long maxBarrierNanos) {
41         super(parent, address, handler);
42         Preconditions.checkArgument(maxNonBarrierMessages > 0);
43         this.maxNonBarrierMessages = maxNonBarrierMessages;
44         Preconditions.checkArgument(maxBarrierNanos > 0);
45         this.maxBarrierNanos = maxBarrierNanos;
46     }
47
48     @Override
49     protected StackedOutboundQueue initializeStackedOutboudnqueue() {
50         return new StackedOutboundQueue(this);
51     }
52
53     private void scheduleBarrierTimer(final long now) {
54         long next = lastBarrierNanos + maxBarrierNanos;
55         if (next < now) {
56             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
57             next = now + maxBarrierNanos;
58         }
59
60         final long delay = next - now;
61         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
62         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
63         barrierTimerEnabled = true;
64     }
65
66     private void scheduleBarrierMessage() {
67         final Long xid = currentQueue.reserveBarrierIfNeeded();
68         if (xid == null) {
69             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
70             return;
71         }
72
73         currentQueue.commitEntry(xid, getHandler().createBarrierRequest(xid), null);
74         LOG.trace("Barrier XID {} scheduled", xid);
75     }
76
77
78     /**
79      * Periodic barrier check.
80      */
81     protected void barrier() {
82         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
83         barrierTimerEnabled = false;
84         if (shuttingDown) {
85             LOG.trace("Channel shut down, not processing barrier");
86             return;
87         }
88
89         final long now = System.nanoTime();
90         final long sinceLast = now - lastBarrierNanos;
91         if (sinceLast >= maxBarrierNanos) {
92             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
93             // FIXME: we should be tracking requests/responses instead of this
94             if (nonBarrierMessages == 0) {
95                 LOG.trace("No messages written since last barrier, not issuing one");
96             } else {
97                 scheduleBarrierMessage();
98             }
99         }
100     }
101
102     /**
103      * Write a message into the underlying channel.
104      *
105      * @param now Time reference for 'now'. We take this as an argument, as
106      *            we need a timestamp to mark barrier messages we see swinging
107      *            by. That timestamp does not need to be completely accurate,
108      *            hence we use the flush start time. Alternative would be to
109      *            measure System.nanoTime() for each barrier -- needlessly
110      *            adding overhead.
111      */
112     @Override
113     void writeMessage(final OfHeader message, final long now) {
114         super.writeMessage(message, now);
115         if (message instanceof BarrierInput) {
116             LOG.trace("Barrier message seen, resetting counters");
117             nonBarrierMessages = 0;
118             lastBarrierNanos = now;
119         } else {
120             nonBarrierMessages++;
121             if (nonBarrierMessages >= maxNonBarrierMessages) {
122                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
123                 scheduleBarrierMessage();
124             } else if (!barrierTimerEnabled) {
125                 scheduleBarrierTimer(now);
126             }
127         }
128     }
129 }