Use ByteBuf.readRetainedSlice()
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import java.net.InetSocketAddress;
12 import java.util.concurrent.TimeUnit;
13 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
14 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
16 import org.opendaylight.yangtools.yang.common.Uint32;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 final class OutboundQueueManager<T extends OutboundQueueHandler> extends
21         AbstractOutboundQueueManager<T, StackedOutboundQueue> {
22     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
23
24     private final int maxNonBarrierMessages;
25     private final long maxBarrierNanos;
26
27     // Updated from netty only
28     private boolean barrierTimerEnabled;
29     private long lastBarrierNanos = System.nanoTime();
30     private int nonBarrierMessages;
31
32     // Passed to executor to request a periodic barrier check
33     private final Runnable barrierRunnable = this::barrier;
34
35     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
36         final int maxNonBarrierMessages, final long maxBarrierNanos) {
37         super(parent, address, handler);
38         Preconditions.checkArgument(maxNonBarrierMessages > 0);
39         this.maxNonBarrierMessages = maxNonBarrierMessages;
40         Preconditions.checkArgument(maxBarrierNanos > 0);
41         this.maxBarrierNanos = maxBarrierNanos;
42     }
43
44     @Override
45     protected StackedOutboundQueue initializeStackedOutboudnqueue() {
46         return new StackedOutboundQueue(this);
47     }
48
49     private void scheduleBarrierTimer(final long now) {
50         long next = lastBarrierNanos + maxBarrierNanos;
51         if (next < now) {
52             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
53             next = now + maxBarrierNanos;
54         }
55
56         final long delay = next - now;
57         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
58         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
59         barrierTimerEnabled = true;
60     }
61
62     private void scheduleBarrierMessage() {
63         final Uint32 xid = currentQueue.reserveBarrierIfNeeded();
64         if (xid == null) {
65             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
66             return;
67         }
68
69         currentQueue.commitEntry(xid, getHandler().createBarrierRequest(xid), null);
70         LOG.trace("Barrier XID {} scheduled", xid);
71     }
72
73
74     /**
75      * Periodic barrier check.
76      */
77     protected void barrier() {
78         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
79         barrierTimerEnabled = false;
80         if (shuttingDown) {
81             LOG.trace("Channel shut down, not processing barrier");
82             return;
83         }
84
85         if (currentQueue.isBarrierNeeded()) {
86             LOG.trace("Sending a barrier message");
87             scheduleBarrierMessage();
88         } else {
89             LOG.trace("Barrier not needed, not issuing one");
90         }
91     }
92
93     /**
94      * Write a message into the underlying channel.
95      *
96      * @param now Time reference for 'now'. We take this as an argument, as
97      *            we need a timestamp to mark barrier messages we see swinging
98      *            by. That timestamp does not need to be completely accurate,
99      *            hence we use the flush start time. Alternative would be to
100      *            measure System.nanoTime() for each barrier -- needlessly
101      *            adding overhead.
102      */
103     @Override
104     void writeMessage(final OfHeader message, final long now) {
105         super.writeMessage(message, now);
106         if (message instanceof BarrierInput) {
107             LOG.trace("Barrier message seen, resetting counters");
108             nonBarrierMessages = 0;
109             lastBarrierNanos = now;
110         } else {
111             nonBarrierMessages++;
112             if (nonBarrierMessages >= maxNonBarrierMessages) {
113                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
114                 scheduleBarrierMessage();
115             } else if (!barrierTimerEnabled) {
116                 scheduleBarrierTimer(now);
117             }
118         }
119     }
120 }