*/
package org.opendaylight.openflowjava.protocol.impl.core.connection;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
+
+import java.util.function.Function;
+
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class OutboundQueueEntry {
private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class);
+ public static final Function<OfHeader, Boolean> DEFAULT_IS_COMPLETE = new Function<OfHeader, Boolean>() {
+
+ @Override
+ public Boolean apply(final OfHeader message) {
+ if (message instanceof MultipartReplyMessage) {
+ return !((MultipartReplyMessage) message).getFlags().isOFPMPFREQMORE();
+ }
+
+ return true;
+ }
+
+ };
+
private FutureCallback<OfHeader> callback;
private OfHeader message;
private boolean completed;
+ private boolean barrier;
private volatile boolean committed;
+ private OutboundQueueException lastException = null;
+ private Function<OfHeader, Boolean> isCompletedFunction = DEFAULT_IS_COMPLETE;
void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
- this.message = message;
- this.callback = callback;
+ commit(message, callback, DEFAULT_IS_COMPLETE);
+ }
- // Volatile write, needs to be last
- committed = true;
+ void commit(final OfHeader message, final FutureCallback<OfHeader> callback,
+ final Function<OfHeader, Boolean> isCompletedFunction) {
+ if (this.completed) {
+ LOG.warn("Can't commit a completed message.");
+ if (callback != null) {
+ callback.onFailure(lastException);
+ }
+ } else {
+ this.message = message;
+ this.callback = callback;
+ this.barrier = message instanceof BarrierInput;
+ this.isCompletedFunction = isCompletedFunction;
+
+ // Volatile write, needs to be last
+ this.committed = true;
+ }
}
void reset() {
+ barrier = false;
callback = null;
- message = null;
completed = false;
+ message = null;
// Volatile write, needs to be last
committed = false;
}
boolean isBarrier() {
- return message instanceof BarrierInput;
+ return barrier;
}
boolean isCommitted() {
return completed;
}
- OfHeader getMessage() {
- return message;
+ OfHeader takeMessage() {
+ final OfHeader ret = message;
+ if (!barrier) {
+ checkCompletionNeed();
+ }
+ message = null;
+ return ret;
+ }
+
+ private void checkCompletionNeed() {
+ if (callback == null || (message instanceof PacketOutInput)) {
+ completed = true;
+ if (callback != null) {
+ callback.onSuccess(null);
+ callback = null;
+ }
+ committed = false;
+ }
}
boolean complete(final OfHeader response) {
- Preconditions.checkState(!completed, "Attempted to complete a completed message %s with response %s", message, response);
+ Preconditions.checkState(!completed, "Attempted to complete a completed message with response %s", response);
// Multipart requests are special, we have to look at them to see
// if there is something outstanding and adjust ourselves accordingly
- final boolean reallyComplete;
- if (response instanceof MultipartReplyMessage) {
- reallyComplete = !((MultipartReplyMessage) response).getFlags().isOFPMPFREQMORE();
- LOG.debug("Multipart reply {}", response);
- } else {
- reallyComplete = true;
- }
+ final boolean reallyComplete = isCompletedFunction.apply(response);
completed = reallyComplete;
if (callback != null) {
callback.onSuccess(response);
+ if (reallyComplete) {
+ // We will not need the callback anymore, make sure it can be GC'd
+ callback = null;
+ }
}
LOG.debug("Entry {} completed {} with response {}", this, completed, response);
return reallyComplete;
void fail(final OutboundQueueException cause) {
if (!completed) {
+ lastException = cause;
completed = true;
if (callback != null) {
callback.onFailure(cause);
+ callback = null;
}
} else {
- LOG.warn("Ignoring failure {} for completed message {}", cause, message);
+ LOG.warn("Ignoring failure {} for completed message", cause);
}
}
+ @VisibleForTesting
+ /** This method is only for testing to prove that after queue entry is completed there is not callback future */
+ boolean hasCallback() {
+ return (callback != null);
+ }
}