From 41c94f361e680fd1676cb2552dddce2896632e88 Mon Sep 17 00:00:00 2001 From: Tomas Slusny Date: Thu, 26 Jan 2017 11:59:01 +0100 Subject: [PATCH] Add isComplete callback to commitEntry This callback will determine, if processed message is completed or not. Change-Id: I29a0faeaa1b3965a88e4d4516e19aee34bc5bf71 Signed-off-by: Tomas Slusny --- .../api/connection/OutboundQueue.java | 36 ++++++++++++++++++- .../AbstractStackedOutboundQueue.java | 14 +++++++- .../core/connection/OutboundQueueEntry.java | 32 +++++++++++++---- .../core/connection/StackedOutboundQueue.java | 9 +++-- .../StackedOutboundQueueNoBarrier.java | 15 ++++++-- 5 files changed, 92 insertions(+), 14 deletions(-) diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java index 3212078c..3b9e7752 100644 --- a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java +++ b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java @@ -9,6 +9,7 @@ package org.opendaylight.openflowjava.protocol.api.connection; import com.google.common.annotations.Beta; import com.google.common.util.concurrent.FutureCallback; +import java.util.function.Function; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; @@ -44,5 +45,38 @@ public interface OutboundQueue { * @param callback Callback to be invoked, or null if no callback should be invoked. * @throws IllegalArgumentException if the slot is already committed or was never reserved. */ - void commitEntry(@Nonnull Long xid, @Nullable OfHeader message, @Nullable FutureCallback callback); + void commitEntry( + @Nonnull Long xid, + @Nullable OfHeader message, + @Nullable FutureCallback callback); + + /** + * Commit the specified offset using a message. Specified callback will + * be invoked once we know how it has resolved, either with a normal response, + * implied completion via a barrier, or failure (such as connection drop). For + * multipart responses, {@link FutureCallback#onSuccess(Object)} will be invoked + * multiple times as the corresponding responses arrive. If the request is completed + * with a response, the object reported will be non-null. If the request's completion + * is implied by a barrier, the object reported will be null. + * + * If this request fails on the remote device, {@link FutureCallback#onFailure(Throwable)} + * will be called with an instance of {@link DeviceRequestFailedException}. + * + * If the request fails due to local reasons, {@link FutureCallback#onFailure(Throwable)} + * will be called with an instance of {@link OutboundQueueException}. In particular, if + * this request failed because the device disconnected, {@link OutboundQueueException#DEVICE_DISCONNECTED} + * will be reported. + * + * @param xid Previously-reserved XID + * @param message Message which should be sent out, or null if the reservation + * should be cancelled. + * @param callback Callback to be invoked, or null if no callback should be invoked. + * @param isComplete Function to determine if OfHeader is processing is complete + * @throws IllegalArgumentException if the slot is already committed or was never reserved. + */ + void commitEntry( + @Nonnull Long xid, + @Nullable OfHeader message, + @Nullable FutureCallback callback, + @Nullable Function isComplete); } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java index 16106a1a..b4356ee4 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java @@ -10,23 +10,30 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import com.google.common.util.concurrent.FutureCallback; + import io.netty.channel.Channel; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.Function; + import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; + import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class AbstractStackedOutboundQueue implements OutboundQueue { private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class); - protected static final AtomicLongFieldUpdater LAST_XID_OFFSET_UPDATER = AtomicLongFieldUpdater .newUpdater(AbstractStackedOutboundQueue.class, "lastXid"); @@ -55,6 +62,11 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue { unflushedSegments.add(firstSegment); } + @Override + public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback) { + commitEntry(xid, message, callback, OutboundQueueEntry.DEFAULT_IS_COMPLETE); + } + @GuardedBy("unflushedSegments") protected void ensureSegment(final StackedSegment first, final int offset) { final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE; diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java index 72efc18a..d88566b8 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java @@ -10,24 +10,47 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; + +import java.util.function.Function; + import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class OutboundQueueEntry { private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class); + public static final Function DEFAULT_IS_COMPLETE = new Function() { + + @Override + public Boolean apply(final OfHeader message) { + if (message instanceof MultipartReplyMessage) { + return !((MultipartReplyMessage) message).getFlags().isOFPMPFREQMORE(); + } + + return true; + } + + }; + private FutureCallback callback; private OfHeader message; private boolean completed; private boolean barrier; private volatile boolean committed; private OutboundQueueException lastException = null; + private Function isCompletedFunction = DEFAULT_IS_COMPLETE; void commit(final OfHeader message, final FutureCallback callback) { + commit(message, callback, DEFAULT_IS_COMPLETE); + } + + void commit(final OfHeader message, final FutureCallback callback, + final Function isCompletedFunction) { if (this.completed) { LOG.warn("Can't commit a completed message."); if (callback != null) { @@ -37,6 +60,7 @@ final class OutboundQueueEntry { this.message = message; this.callback = callback; this.barrier = message instanceof BarrierInput; + this.isCompletedFunction = isCompletedFunction; // Volatile write, needs to be last this.committed = true; @@ -90,13 +114,7 @@ final class OutboundQueueEntry { // Multipart requests are special, we have to look at them to see // if there is something outstanding and adjust ourselves accordingly - final boolean reallyComplete; - if (response instanceof MultipartReplyMessage) { - reallyComplete = !((MultipartReplyMessage) response).getFlags().isOFPMPFREQMORE(); - LOG.debug("Multipart reply {}", response); - } else { - reallyComplete = true; - } + final boolean reallyComplete = isCompletedFunction.apply(response); completed = reallyComplete; if (callback != null) { diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java index cafd114c..dd1e9520 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java @@ -8,8 +8,12 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.util.concurrent.FutureCallback; + import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.Function; + import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,10 +31,11 @@ final class StackedOutboundQueue extends AbstractStackedOutboundQueue { * This method is expected to be called from multiple threads concurrently */ @Override - public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback) { + public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback, + final Function isCompletedFunction) { final OutboundQueueEntry entry = getEntry(xid); - entry.commit(message, callback); + entry.commit(message, callback, isCompletedFunction); if (entry.isBarrier()) { long my = xid; for (;;) { diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java index 2917631a..76b1243a 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java @@ -9,10 +9,17 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.util.concurrent.FutureCallback; + import io.netty.channel.Channel; + +import java.util.function.Function; + import javax.annotation.Nonnull; + import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,14 +39,15 @@ public class StackedOutboundQueueNoBarrier extends AbstractStackedOutboundQueue * This method is expected to be called from multiple threads concurrently */ @Override - public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback) { + public void commitEntry(final Long xid, final OfHeader message, final FutureCallback callback, + final Function isCompletedFunction) { final OutboundQueueEntry entry = getEntry(xid); if (message instanceof FlowModInput) { callback.onSuccess(null); - entry.commit(message, null); + entry.commit(message, null, isCompletedFunction); } else { - entry.commit(message, callback); + entry.commit(message, callback, isCompletedFunction); } LOG.trace("Queue {} committed XID {}", this, xid); @@ -111,4 +119,5 @@ public class StackedOutboundQueueNoBarrier extends AbstractStackedOutboundQueue return entries; } + } -- 2.36.6