BUG-3219: Introduce OutboundQueueHandler and related interfaces
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueImpl.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.FutureCallback;
12 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
13 import java.util.concurrent.locks.LockSupport;
14 import javax.annotation.Nonnull;
15 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 final class OutboundQueueImpl implements OutboundQueue {
21     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
22     private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
23             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
24     private static final long FLUSH_RETRY_NANOS = 1L;
25     private final OutboundQueueManager<?> manager;
26     private final OutboundQueueEntry[] queue;
27     private final long baseXid;
28     private final long endXid;
29     private final int reserve;
30
31     // Updated concurrently
32     private volatile int reserveOffset;
33
34     // Updated from Netty only
35     private int flushOffset;
36     private int completeCount;
37
38     OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
39         /*
40          * We use the last entry as an emergency should a timeout-triggered
41          * flush request race with normal users for the last entry in this
42          * queue. In that case the flush request will take the last entry and
43          * schedule a flush, which means that we will get around sending the
44          * message as soon as the user finishes the reservation.
45          */
46         Preconditions.checkArgument(maxQueue > 1);
47         this.baseXid = baseXid;
48         this.endXid = baseXid + maxQueue;
49         this.reserve = maxQueue - 1;
50         this.manager = Preconditions.checkNotNull(manager);
51         queue = new OutboundQueueEntry[maxQueue];
52         for (int i = 0; i < maxQueue; ++i) {
53             queue[i] = new OutboundQueueEntry();
54         }
55     }
56
57     private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
58         this.manager = Preconditions.checkNotNull(manager);
59         this.queue = Preconditions.checkNotNull(queue);
60         this.baseXid = baseXid;
61         this.endXid = baseXid + queue.length;
62         this.reserve = queue.length - 1;
63         for (OutboundQueueEntry element : queue) {
64             element.reset();
65         }
66     }
67
68     OutboundQueueImpl reuse(final long baseXid) {
69         return new OutboundQueueImpl(manager, baseXid, queue);
70     }
71
72     Long reserveEntry(final boolean forBarrier) {
73         final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
74         if (offset >= reserve) {
75             if (forBarrier) {
76                 LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
77                 return endXid;
78             } else {
79                 LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
80                 return null;
81             }
82         }
83
84         final Long xid = baseXid + offset;
85         LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
86         return xid;
87     }
88
89     @Override
90     public Long reserveEntry() {
91         return reserveEntry(false);
92     }
93
94     @Override
95     public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
96         final int offset = (int)(xid - baseXid);
97         if (message != null) {
98             Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
99         }
100
101         queue[offset].commit(message, callback);
102         LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
103
104         manager.ensureFlushing(this);
105     }
106
107     /**
108      * An empty queue is a queue which has no further unflushed entries.
109      *
110      * @return True if this queue does not have unprocessed entries.
111      */
112     boolean isEmpty() {
113         int ro = reserveOffset;
114         if (ro >= reserve) {
115             if (queue[reserve].isCommitted()) {
116                 ro = reserve + 1;
117             } else {
118                 ro = reserve;
119             }
120         }
121
122         LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
123         return ro <= flushOffset;
124     }
125
126     /**
127      * A queue is finished when all of its entries have been completed.
128      *
129      * @return False if there are any uncompleted requests.
130      */
131     boolean isFinished() {
132         if (completeCount < reserve) {
133             return false;
134         }
135
136         // We need to check if the last entry was used
137         final OutboundQueueEntry last = queue[reserve];
138         return !last.isCommitted() || last.isCompleted();
139     }
140
141     boolean isFlushed() {
142         LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
143         if (flushOffset < reserve) {
144             return false;
145         }
146
147         // flushOffset implied == reserve
148         return flushOffset >= queue.length || !queue[reserve].isCommitted();
149     }
150
151     OfHeader flushEntry() {
152         for (;;) {
153             // No message ready
154             if (isEmpty()) {
155                 LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
156                 return null;
157             }
158
159             boolean retry = true;
160             while (!queue[flushOffset].isCommitted()) {
161                 if (!retry) {
162                     LOG.debug("Offset {} not ready yet, giving up", flushOffset);
163                     return null;
164                 }
165
166                 LOG.debug("Offset {} not ready yet, retrying", flushOffset);
167                 LockSupport.parkNanos(FLUSH_RETRY_NANOS);
168                 retry = false;
169             }
170
171             final OfHeader msg = queue[flushOffset++].getMessage();
172             if (msg != null) {
173                 return msg;
174             }
175         }
176     }
177
178     private boolean xidInRance(final long xid) {
179         return xid < endXid && (xid >= baseXid || baseXid > endXid);
180     }
181
182     /**
183      * Return the request entry corresponding to a response. Returns null
184      * if there is no request matching the response.
185      *
186      * @param response Response message
187      * @return Matching request entry, or null if no match is found.
188      */
189     OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
190         final Long xid = response.getXid();
191         if (!xidInRance(xid)) {
192             LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
193             return null;
194         }
195
196         final int offset = (int)(xid - baseXid);
197         final OutboundQueueEntry entry = queue[offset];
198         if (entry.isCompleted()) {
199             LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
200             return null;
201         }
202
203         if (entry.complete(response)) {
204             completeCount++;
205
206             // This has been a barrier -- make sure we complete all preceding requests
207             if (entry.isBarrier()) {
208                 LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1);
209                 for (int i = 0; i < offset; ++i) {
210                     final OutboundQueueEntry e = queue[i];
211                     if (!e.isCompleted() && e.complete(null)) {
212                         completeCount++;
213                     }
214                 }
215             }
216         }
217         return entry;
218     }
219
220     void completeAll() {
221         for (OutboundQueueEntry entry : queue) {
222             if (!entry.isCompleted() && entry.complete(null)) {
223                 completeCount++;
224             }
225         }
226     }
227
228     int failAll(final Throwable cause) {
229         int ret = 0;
230         for (OutboundQueueEntry entry : queue) {
231             if (!entry.isCompleted()) {
232                 entry.fail(cause);
233                 ret++;
234             }
235         }
236
237         return ret;
238     }
239 }