Barrier turn on/off - StackedOutboundQueue definition
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / StackedOutboundQueue.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import io.netty.channel.Channel;
14 import java.util.Iterator;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import javax.annotation.Nonnull;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
23     private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
24     private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
25
26     private volatile long allocatedXid = -1;
27     private volatile long barrierXid = -1;
28
29     StackedOutboundQueue(final AbstractOutboundQueueManager<?, ?> manager) {
30         super(manager);
31     }
32
33     @GuardedBy("unflushedSegments")
34     private void ensureSegment(final StackedSegment first, final int offset) {
35         final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
36         LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size());
37
38         for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) {
39             final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i));
40             LOG.debug("Adding segment {}", newSegment);
41             unflushedSegments.add(newSegment);
42         }
43
44         allocatedXid = uncompletedSegments.get(uncompletedSegments.size() - 1).getEndXid();
45     }
46
47     /*
48      * This method is expected to be called from multiple threads concurrently.
49      */
50     @Override
51     public Long reserveEntry() {
52         final long xid = LAST_XID_UPDATER.incrementAndGet(this);
53         final StackedSegment fastSegment = firstSegment;
54
55         if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) {
56            if (xid >= allocatedXid) {
57                 // Multiple segments, this a slow path
58                 LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
59
60                 synchronized (unflushedSegments) {
61                     LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
62
63                     // Shutdown was scheduled, need to fail the reservation
64                     if (shutdownOffset != null) {
65                         LOG.debug("Queue {} is being shutdown, failing reservation", this);
66                         return null;
67                     }
68
69                     // Ensure we have the appropriate segment for the specified XID
70                     final StackedSegment slowSegment = firstSegment;
71                     final int slowOffset = (int) (xid - slowSegment.getBaseXid());
72                     Verify.verify(slowOffset >= 0);
73
74                     // Now, we let's see if we need to allocate a new segment
75                     ensureSegment(slowSegment, slowOffset);
76
77                     LOG.debug("Queue {} slow reservation finished", this);
78                 }
79             } else {
80                 LOG.debug("Queue {} XID {} is already backed", this, xid);
81             }
82         }
83
84         LOG.trace("Queue {} allocated XID {}", this, xid);
85         return xid;
86     }
87
88     /*
89      * This method is expected to be called from multiple threads concurrently
90      */
91     @Override
92     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
93         final StackedSegment fastSegment = firstSegment;
94         final long calcOffset = xid - fastSegment.getBaseXid();
95         Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
96
97         Verify.verify(calcOffset <= Integer.MAX_VALUE);
98         final int fastOffset = (int) calcOffset;
99
100         final OutboundQueueEntry entry;
101         if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
102             LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
103
104             final StackedSegment segment;
105             final int slowOffset;
106             synchronized (unflushedSegments) {
107                 final StackedSegment slowSegment = firstSegment;
108                 final long slowCalcOffset = xid - slowSegment.getBaseXid();
109                 Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
110                 slowOffset = (int) slowCalcOffset;
111
112                 LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
113                 segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
114             }
115
116             final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
117             entry = segment.getEntry(segOffset);
118             LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
119         } else {
120             entry = fastSegment.getEntry(fastOffset);
121         }
122
123         entry.commit(message, callback);
124         if (entry.isBarrier()) {
125             long my = xid;
126             for (;;) {
127                 final long prev = BARRIER_XID_UPDATER.getAndSet(this, my);
128                 if (prev < my) {
129                     LOG.debug("Queue {} recorded pending barrier XID {}", this, my);
130                     break;
131                 }
132
133                 // We have traveled back, recover
134                 LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
135                 my = prev;
136             }
137         }
138
139         LOG.trace("Queue {} committed XID {}", this, xid);
140         manager.ensureFlushing();
141     }
142
143     @Override
144     int writeEntries(@Nonnull final Channel channel, final long now) {
145         // Local cache
146         StackedSegment segment = firstSegment;
147         int entries = 0;
148
149         while (channel.isWritable()) {
150             final OutboundQueueEntry entry = segment.getEntry(flushOffset);
151             if (!entry.isCommitted()) {
152                 LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset);
153                 break;
154             }
155
156             LOG.trace("Queue {} flushing entry at offset {}", this, flushOffset);
157             final OfHeader message = entry.takeMessage();
158             flushOffset++;
159             entries++;
160
161             if (message != null) {
162                 manager.writeMessage(message, now);
163             } else {
164                 entry.complete(null);
165             }
166
167             if (flushOffset >= StackedSegment.SEGMENT_SIZE) {
168                 /*
169                  * Slow path: purge the current segment unless it's the last one.
170                  * If it is, we leave it for replacement when a new reservation
171                  * is run on it.
172                  *
173                  * This costs us two slow paths, but hey, this should be very rare,
174                  * so let's keep things simple.
175                  */
176                 synchronized (unflushedSegments) {
177                     LOG.debug("Flush offset {} unflushed segments {}", flushOffset, unflushedSegments.size());
178
179                     // We may have raced ahead of reservation code and need to allocate a segment
180                     ensureSegment(segment, flushOffset);
181
182                     // Remove the segment, update the firstSegment and reset flushOffset
183                     final StackedSegment oldSegment = unflushedSegments.remove(0);
184                     if (oldSegment.isComplete()) {
185                         uncompletedSegments.remove(oldSegment);
186                         oldSegment.recycle();
187                     }
188
189                     // Reset the first segment and add it to the uncompleted list
190                     segment = unflushedSegments.get(0);
191                     uncompletedSegments.add(segment);
192
193                     // Update the shutdown offset
194                     if (shutdownOffset != null) {
195                         shutdownOffset -= StackedSegment.SEGMENT_SIZE;
196                     }
197
198                     // Allow reservations back on the fast path by publishing the new first segment
199                     firstSegment = segment;
200
201                     flushOffset = 0;
202                     LOG.debug("Queue {} flush moved to segment {}", this, segment);
203                 }
204             }
205         }
206
207         return entries;
208     }
209
210     Long reserveBarrierIfNeeded() {
211         final long bXid = barrierXid;
212         final long fXid = firstSegment.getBaseXid() + flushOffset;
213         if (bXid >= fXid) {
214             LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
215             return null;
216         }
217         return reserveEntry();
218     }
219
220     @Override
221     boolean pairRequest(final OfHeader message) {
222         Iterator<StackedSegment> it = uncompletedSegments.iterator();
223         while (it.hasNext()) {
224             final StackedSegment queue = it.next();
225             final OutboundQueueEntry entry = queue.pairRequest(message);
226             if (entry == null) {
227                 continue;
228             }
229
230             LOG.trace("Queue {} accepted response {}", queue, message);
231
232             // This has been a barrier request, we need to flush all
233             // previous queues
234             if (entry.isBarrier() && uncompletedSegments.size() > 1) {
235                 LOG.trace("Queue {} indicated request was a barrier", queue);
236
237                 it = uncompletedSegments.iterator();
238                 while (it.hasNext()) {
239                     final StackedSegment q = it.next();
240
241                     // We want to complete all queues before the current one, we will
242                     // complete the current queue below
243                     if (!queue.equals(q)) {
244                         LOG.trace("Queue {} is implied finished", q);
245                         q.completeAll();
246                         it.remove();
247                         q.recycle();
248                     } else {
249                         break;
250                     }
251                 }
252             }
253
254             if (queue.isComplete()) {
255                 LOG.trace("Queue {} is finished", queue);
256                 it.remove();
257                 queue.recycle();
258             }
259
260             return true;
261         }
262
263         LOG.debug("Failed to find completion for message {}", message);
264         return false;
265     }
266 }