Add isComplete callback to commitEntry 64/51064/4
authorTomas Slusny <tomas.slusny@pantheon.tech>
Thu, 26 Jan 2017 10:59:01 +0000 (11:59 +0100)
committerTomas Slusny <tomas.slusny@pantheon.tech>
Thu, 26 Jan 2017 16:05:40 +0000 (17:05 +0100)
This callback will determine, if processed message
is completed or not.

Change-Id: I29a0faeaa1b3965a88e4d4516e19aee34bc5bf71
Signed-off-by: Tomas Slusny <tomas.slusny@pantheon.tech>
openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractStackedOutboundQueue.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueue.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/StackedOutboundQueueNoBarrier.java

index 3212078c6bc2655ba4382d09f5c9c0231a3c6f1a..3b9e7752b9a80e5032055b5c3728709b437f3a13 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.openflowjava.protocol.api.connection;
 
 import com.google.common.annotations.Beta;
 import com.google.common.util.concurrent.FutureCallback;
+import java.util.function.Function;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
@@ -44,5 +45,38 @@ public interface OutboundQueue {
      * @param callback Callback to be invoked, or null if no callback should be invoked.
      * @throws IllegalArgumentException if the slot is already committed or was never reserved.
      */
-    void commitEntry(@Nonnull Long xid, @Nullable OfHeader message, @Nullable FutureCallback<OfHeader> callback);
+    void commitEntry(
+            @Nonnull Long xid,
+            @Nullable OfHeader message,
+            @Nullable FutureCallback<OfHeader> callback);
+
+    /**
+     * Commit the specified offset using a message. Specified callback will
+     * be invoked once we know how it has resolved, either with a normal response,
+     * implied completion via a barrier, or failure (such as connection drop). For
+     * multipart responses, {@link FutureCallback#onSuccess(Object)} will be invoked
+     * multiple times as the corresponding responses arrive. If the request is completed
+     * with a response, the object reported will be non-null. If the request's completion
+     * is implied by a barrier, the object reported will be null.
+     *
+     * If this request fails on the remote device, {@link FutureCallback#onFailure(Throwable)}
+     * will be called with an instance of {@link DeviceRequestFailedException}.
+     *
+     * If the request fails due to local reasons, {@link FutureCallback#onFailure(Throwable)}
+     * will be called with an instance of {@link OutboundQueueException}. In particular, if
+     * this request failed because the device disconnected, {@link OutboundQueueException#DEVICE_DISCONNECTED}
+     * will be reported.
+     *
+     * @param xid Previously-reserved XID
+     * @param message Message which should be sent out, or null if the reservation
+     *                should be cancelled.
+     * @param callback Callback to be invoked, or null if no callback should be invoked.
+     * @param isComplete Function to determine if OfHeader is processing is complete
+     * @throws IllegalArgumentException if the slot is already committed or was never reserved.
+     */
+    void commitEntry(
+            @Nonnull Long xid,
+            @Nullable OfHeader message,
+            @Nullable FutureCallback<OfHeader> callback,
+            @Nullable Function<OfHeader, Boolean> isComplete);
 }
index 16106a1a4ee8a17ff5621e02ebe40b7e5f2263da..b4356ee41ef03d88bcea34a5db87137fab95d8a5 100644 (file)
@@ -10,23 +10,30 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import com.google.common.util.concurrent.FutureCallback;
+
 import io.netty.channel.Channel;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Function;
+
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
+
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 abstract class AbstractStackedOutboundQueue implements OutboundQueue {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class);
-
     protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_OFFSET_UPDATER = AtomicLongFieldUpdater
             .newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
 
@@ -55,6 +62,11 @@ abstract class AbstractStackedOutboundQueue implements OutboundQueue {
         unflushedSegments.add(firstSegment);
     }
 
+    @Override
+    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+        commitEntry(xid, message, callback, OutboundQueueEntry.DEFAULT_IS_COMPLETE);
+    }
+
     @GuardedBy("unflushedSegments")
     protected void ensureSegment(final StackedSegment first, final int offset) {
         final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
index 72efc18a15c56b76fa17e7b5d356e8c52c5f45bc..d88566b8d3635efc24309b4c28ed791577ac7331 100644 (file)
@@ -10,24 +10,47 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
+
+import java.util.function.Function;
+
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class OutboundQueueEntry {
     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class);
+    public static final Function<OfHeader, Boolean> DEFAULT_IS_COMPLETE = new Function<OfHeader, Boolean>() {
+
+        @Override
+        public Boolean apply(final OfHeader message) {
+            if (message instanceof MultipartReplyMessage) {
+                return !((MultipartReplyMessage) message).getFlags().isOFPMPFREQMORE();
+            }
+
+            return true;
+        }
+
+    };
+
     private FutureCallback<OfHeader> callback;
     private OfHeader message;
     private boolean completed;
     private boolean barrier;
     private volatile boolean committed;
     private OutboundQueueException lastException = null;
+    private Function<OfHeader, Boolean> isCompletedFunction = DEFAULT_IS_COMPLETE;
 
     void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
+        commit(message, callback, DEFAULT_IS_COMPLETE);
+    }
+
+    void commit(final OfHeader message, final FutureCallback<OfHeader> callback,
+            final Function<OfHeader, Boolean> isCompletedFunction) {
         if (this.completed) {
             LOG.warn("Can't commit a completed message.");
             if (callback != null) {
@@ -37,6 +60,7 @@ final class OutboundQueueEntry {
             this.message = message;
             this.callback = callback;
             this.barrier = message instanceof BarrierInput;
+            this.isCompletedFunction = isCompletedFunction;
 
             // Volatile write, needs to be last
             this.committed = true;
@@ -90,13 +114,7 @@ final class OutboundQueueEntry {
 
         // Multipart requests are special, we have to look at them to see
         // if there is something outstanding and adjust ourselves accordingly
-        final boolean reallyComplete;
-        if (response instanceof MultipartReplyMessage) {
-            reallyComplete = !((MultipartReplyMessage) response).getFlags().isOFPMPFREQMORE();
-            LOG.debug("Multipart reply {}", response);
-        } else {
-            reallyComplete = true;
-        }
+        final boolean reallyComplete = isCompletedFunction.apply(response);
 
         completed = reallyComplete;
         if (callback != null) {
index cafd114c1a021219a5f9af69c103771253e19332..dd1e952002a74e3a5933ace7f3e48fbf8f819bea 100644 (file)
@@ -8,8 +8,12 @@
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.util.concurrent.FutureCallback;
+
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.Function;
+
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,10 +31,11 @@ final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
      * This method is expected to be called from multiple threads concurrently
      */
     @Override
-    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback,
+            final Function<OfHeader, Boolean> isCompletedFunction) {
         final OutboundQueueEntry entry = getEntry(xid);
 
-        entry.commit(message, callback);
+        entry.commit(message, callback, isCompletedFunction);
         if (entry.isBarrier()) {
             long my = xid;
             for (;;) {
index 2917631af071d52e6d651696209c4604fb89d7b6..76b1243a7b5def846049bd5512639571074f003d 100644 (file)
@@ -9,10 +9,17 @@
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.util.concurrent.FutureCallback;
+
 import io.netty.channel.Channel;
+
+import java.util.function.Function;
+
 import javax.annotation.Nonnull;
+
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,14 +39,15 @@ public class StackedOutboundQueueNoBarrier extends AbstractStackedOutboundQueue
      * This method is expected to be called from multiple threads concurrently
      */
     @Override
-    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback,
+            final Function<OfHeader, Boolean> isCompletedFunction) {
         final OutboundQueueEntry entry = getEntry(xid);
 
         if (message instanceof FlowModInput) {
             callback.onSuccess(null);
-            entry.commit(message, null);
+            entry.commit(message, null, isCompletedFunction);
         } else {
-            entry.commit(message, callback);
+            entry.commit(message, callback, isCompletedFunction);
         }
 
         LOG.trace("Queue {} committed XID {}", this, xid);
@@ -111,4 +119,5 @@ public class StackedOutboundQueueNoBarrier extends AbstractStackedOutboundQueue
 
         return entries;
     }
+
 }