Enable periodic barrier only when needed
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.FutureCallback;
12 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
13 import java.util.concurrent.locks.LockSupport;
14 import javax.annotation.Nonnull;
15 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 final class OutboundQueueImpl implements OutboundQueue {
21     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
22     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
23             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
24     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
25             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
26     private static final long FLUSH_RETRY_NANOS = 1L;
27     private final OutboundQueueManager<?> manager;
28     private final OutboundQueueEntry[] queue;
29     private final long baseXid;
30     private final long endXid;
31     private final int reserve;
32
33     // Updated concurrently
34     private volatile int barrierOffset = -1;
35     private volatile int reserveOffset = 0;
36
37     // Updated from Netty only
38     private int flushOffset;
39     private int completeCount;
40
41     OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
42         /*
43          * We use the last entry as an emergency should a timeout-triggered
44          * flush request race with normal users for the last entry in this
45          * queue. In that case the flush request will take the last entry and
46          * schedule a flush, which means that we will get around sending the
47          * message as soon as the user finishes the reservation.
48          */
49         Preconditions.checkArgument(maxQueue > 1);
50         this.baseXid = baseXid;
51         this.endXid = baseXid + maxQueue;
52         this.reserve = maxQueue - 1;
53         this.manager = Preconditions.checkNotNull(manager);
54         queue = new OutboundQueueEntry[maxQueue];
55         for (int i = 0; i < maxQueue; ++i) {
56             queue[i] = new OutboundQueueEntry();
57         }
58     }
59
60     private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
61         this.manager = Preconditions.checkNotNull(manager);
62         this.queue = Preconditions.checkNotNull(queue);
63         this.baseXid = baseXid;
64         this.endXid = baseXid + queue.length;
65         this.reserve = queue.length - 1;
66         for (OutboundQueueEntry element : queue) {
67             element.reset();
68         }
69     }
70
71     OutboundQueueImpl reuse(final long baseXid) {
72         return new OutboundQueueImpl(manager, baseXid, queue);
73     }
74
75     @Override
76     public Long reserveEntry() {
77         return reserveEntry(false);
78     }
79
80     @Override
81     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
82         final int offset = (int)(xid - baseXid);
83         if (message != null) {
84             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
85         }
86
87         final OutboundQueueEntry entry = queue[offset];
88         entry.commit(message, callback);
89         LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
90
91         if (entry.isBarrier()) {
92             int my = offset;
93             for (;;) {
94                 final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
95                 if (prev < my) {
96                     LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
97                     break;
98                 }
99
100                 // We have traveled back, recover
101                 my = prev;
102             }
103         }
104
105         manager.ensureFlushing(this);
106     }
107
108     private Long reserveEntry(final boolean forBarrier) {
109         final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
110         if (offset >= reserve) {
111             if (forBarrier) {
112                 LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
113                 return endXid;
114             } else {
115                 LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
116                 return null;
117             }
118         }
119
120         final Long xid = baseXid + offset;
121         LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
122         return xid;
123     }
124
125     Long reserveBarrierIfNeeded() {
126         final int bo = barrierOffset;
127         if (bo >= flushOffset) {
128             LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
129             return null;
130         } else {
131             return reserveEntry(true);
132         }
133     }
134
135     /**
136      * An empty queue is a queue which has no further unflushed entries.
137      *
138      * @return True if this queue does not have unprocessed entries.
139      */
140     boolean isEmpty() {
141         int ro = reserveOffset;
142         if (ro >= reserve) {
143             if (queue[reserve].isCommitted()) {
144                 ro = reserve + 1;
145             } else {
146                 ro = reserve;
147             }
148         }
149
150         LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
151         return ro <= flushOffset;
152     }
153
154     /**
155      * A queue is finished when all of its entries have been completed.
156      *
157      * @return False if there are any uncompleted requests.
158      */
159     boolean isFinished() {
160         if (completeCount < reserve) {
161             return false;
162         }
163
164         // We need to check if the last entry was used
165         final OutboundQueueEntry last = queue[reserve];
166         return !last.isCommitted() || last.isCompleted();
167     }
168
169     boolean isFlushed() {
170         LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
171         if (flushOffset < reserve) {
172             return false;
173         }
174
175         // flushOffset implied == reserve
176         return flushOffset >= queue.length || !queue[reserve].isCommitted();
177     }
178
179     OfHeader flushEntry() {
180         for (;;) {
181             // No message ready
182             if (isEmpty()) {
183                 LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
184                 return null;
185             }
186
187             boolean retry = true;
188             while (!queue[flushOffset].isCommitted()) {
189                 if (!retry) {
190                     LOG.debug("Offset {} not ready yet, giving up", flushOffset);
191                     return null;
192                 }
193
194                 LOG.debug("Offset {} not ready yet, retrying", flushOffset);
195                 LockSupport.parkNanos(FLUSH_RETRY_NANOS);
196                 retry = false;
197             }
198
199             final OfHeader msg = queue[flushOffset++].getMessage();
200             if (msg != null) {
201                 return msg;
202             }
203         }
204     }
205
206     private boolean xidInRance(final long xid) {
207         return xid < endXid && (xid >= baseXid || baseXid > endXid);
208     }
209
210     /**
211      * Return the request entry corresponding to a response. Returns null
212      * if there is no request matching the response.
213      *
214      * @param response Response message
215      * @return Matching request entry, or null if no match is found.
216      */
217     OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
218         final Long xid = response.getXid();
219         if (!xidInRance(xid)) {
220             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
221             return null;
222         }
223
224         final int offset = (int)(xid - baseXid);
225         final OutboundQueueEntry entry = queue[offset];
226         if (entry.isCompleted()) {
227             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
228             return null;
229         }
230
231         if (entry.complete(response)) {
232             completeCount++;
233
234             // This has been a barrier -- make sure we complete all preceding requests
235             if (entry.isBarrier()) {
236                 LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1);
237                 for (int i = 0; i < offset; ++i) {
238                     final OutboundQueueEntry e = queue[i];
239                     if (!e.isCompleted() && e.complete(null)) {
240                         completeCount++;
241                     }
242                 }
243             }
244         }
245         return entry;
246     }
247
248     void completeAll() {
249         for (OutboundQueueEntry entry : queue) {
250             if (!entry.isCompleted() && entry.complete(null)) {
251                 completeCount++;
252             }
253         }
254     }
255
256     int failAll(final Throwable cause) {
257         int ret = 0;
258         for (OutboundQueueEntry entry : queue) {
259             if (!entry.isCompleted()) {
260                 entry.fail(cause);
261                 ret++;
262             }
263         }
264
265         return ret;
266     }
267 }