package org.opendaylight.openflowjava.protocol.impl.core.connection;
import com.google.common.util.concurrent.FutureCallback;
+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Function;
+
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This method is expected to be called from multiple threads concurrently
*/
@Override
- public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+ public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback,
+ final Function<OfHeader, Boolean> isCompletedFunction) {
final OutboundQueueEntry entry = getEntry(xid);
- entry.commit(message, callback);
+ entry.commit(message, callback, isCompletedFunction);
if (entry.isBarrier()) {
long my = xid;
for (;;) {
}
Long reserveBarrierIfNeeded() {
+ if (isBarrierNeeded()) {
+ return reserveEntry();
+ }
+ return null;
+ }
+
+ /**
+ * Checks if Barrier Request is the last message enqueued. If not, one needs
+ * to be scheduled in order to collect data about previous messages.
+ * @return true if last enqueued message is Barrier Request, false otherwise
+ */
+ boolean isBarrierNeeded() {
final long bXid = barrierXid;
final long fXid = firstSegment.getBaseXid() + flushOffset;
if (bXid >= fXid) {
LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
- return null;
+ return false;
}
- return reserveEntry();
+ return true;
}
}