import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.FutureCallback;
+import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
* @param callback Callback to be invoked, or null if no callback should be invoked.
* @throws IllegalArgumentException if the slot is already committed or was never reserved.
*/
- void commitEntry(@Nonnull Long xid, @Nullable OfHeader message, @Nullable FutureCallback<OfHeader> callback);
+ void commitEntry(
+ @Nonnull Long xid,
+ @Nullable OfHeader message,
+ @Nullable FutureCallback<OfHeader> callback);
+
+ /**
+ * Commit the specified offset using a message. Specified callback will
+ * be invoked once we know how it has resolved, either with a normal response,
+ * implied completion via a barrier, or failure (such as connection drop). For
+ * multipart responses, {@link FutureCallback#onSuccess(Object)} will be invoked
+ * multiple times as the corresponding responses arrive. If the request is completed
+ * with a response, the object reported will be non-null. If the request's completion
+ * is implied by a barrier, the object reported will be null.
+ *
+ * If this request fails on the remote device, {@link FutureCallback#onFailure(Throwable)}
+ * will be called with an instance of {@link DeviceRequestFailedException}.
+ *
+ * If the request fails due to local reasons, {@link FutureCallback#onFailure(Throwable)}
+ * will be called with an instance of {@link OutboundQueueException}. In particular, if
+ * this request failed because the device disconnected, {@link OutboundQueueException#DEVICE_DISCONNECTED}
+ * will be reported.
+ *
+ * @param xid Previously-reserved XID
+ * @param message Message which should be sent out, or null if the reservation
+ * should be cancelled.
+ * @param callback Callback to be invoked, or null if no callback should be invoked.
+ * @param isComplete Function to determine if OfHeader is processing is complete
+ * @throws IllegalArgumentException if the slot is already committed or was never reserved.
+ */
+ void commitEntry(
+ @Nonnull Long xid,
+ @Nullable OfHeader message,
+ @Nullable FutureCallback<OfHeader> callback,
+ @Nullable Function<OfHeader, Boolean> isComplete);
}
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import com.google.common.util.concurrent.FutureCallback;
+
import io.netty.channel.Channel;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Function;
+
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
+
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractStackedOutboundQueue implements OutboundQueue {
private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class);
-
protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_OFFSET_UPDATER = AtomicLongFieldUpdater
.newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
unflushedSegments.add(firstSegment);
}
+ @Override
+ public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+ commitEntry(xid, message, callback, OutboundQueueEntry.DEFAULT_IS_COMPLETE);
+ }
+
@GuardedBy("unflushedSegments")
protected void ensureSegment(final StackedSegment first, final int offset) {
final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
+
+import java.util.function.Function;
+
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class OutboundQueueEntry {
private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class);
+ public static final Function<OfHeader, Boolean> DEFAULT_IS_COMPLETE = new Function<OfHeader, Boolean>() {
+
+ @Override
+ public Boolean apply(final OfHeader message) {
+ if (message instanceof MultipartReplyMessage) {
+ return !((MultipartReplyMessage) message).getFlags().isOFPMPFREQMORE();
+ }
+
+ return true;
+ }
+
+ };
+
private FutureCallback<OfHeader> callback;
private OfHeader message;
private boolean completed;
private boolean barrier;
private volatile boolean committed;
private OutboundQueueException lastException = null;
+ private Function<OfHeader, Boolean> isCompletedFunction = DEFAULT_IS_COMPLETE;
void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
+ commit(message, callback, DEFAULT_IS_COMPLETE);
+ }
+
+ void commit(final OfHeader message, final FutureCallback<OfHeader> callback,
+ final Function<OfHeader, Boolean> isCompletedFunction) {
if (this.completed) {
LOG.warn("Can't commit a completed message.");
if (callback != null) {
this.message = message;
this.callback = callback;
this.barrier = message instanceof BarrierInput;
+ this.isCompletedFunction = isCompletedFunction;
// Volatile write, needs to be last
this.committed = true;
// Multipart requests are special, we have to look at them to see
// if there is something outstanding and adjust ourselves accordingly
- final boolean reallyComplete;
- if (response instanceof MultipartReplyMessage) {
- reallyComplete = !((MultipartReplyMessage) response).getFlags().isOFPMPFREQMORE();
- LOG.debug("Multipart reply {}", response);
- } else {
- reallyComplete = true;
- }
+ final boolean reallyComplete = isCompletedFunction.apply(response);
completed = reallyComplete;
if (callback != null) {
package org.opendaylight.openflowjava.protocol.impl.core.connection;
import com.google.common.util.concurrent.FutureCallback;
+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Function;
+
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This method is expected to be called from multiple threads concurrently
*/
@Override
- public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+ public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback,
+ final Function<OfHeader, Boolean> isCompletedFunction) {
final OutboundQueueEntry entry = getEntry(xid);
- entry.commit(message, callback);
+ entry.commit(message, callback, isCompletedFunction);
if (entry.isBarrier()) {
long my = xid;
for (;;) {
package org.opendaylight.openflowjava.protocol.impl.core.connection;
import com.google.common.util.concurrent.FutureCallback;
+
import io.netty.channel.Channel;
+
+import java.util.function.Function;
+
import javax.annotation.Nonnull;
+
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This method is expected to be called from multiple threads concurrently
*/
@Override
- public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+ public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback,
+ final Function<OfHeader, Boolean> isCompletedFunction) {
final OutboundQueueEntry entry = getEntry(xid);
if (message instanceof FlowModInput) {
callback.onSuccess(null);
- entry.commit(message, null);
+ entry.commit(message, null, isCompletedFunction);
} else {
- entry.commit(message, callback);
+ entry.commit(message, callback, isCompletedFunction);
}
LOG.trace("Queue {} committed XID {}", this, xid);
return entries;
}
+
}