Barrier turn on/off - move more functionality from StackedOutboundQueue
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueEntry.java
index c0c2f764f3aa88c9b12da74ed5878f298d74fa0b..4b8820bd9b430317e7dd4a6b0ddd6d65d091e0ea 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
@@ -20,27 +21,30 @@ final class OutboundQueueEntry {
     private FutureCallback<OfHeader> callback;
     private OfHeader message;
     private boolean completed;
+    private boolean barrier;
     private volatile boolean committed;
 
     void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
         this.message = message;
         this.callback = callback;
+        this.barrier = message instanceof BarrierInput;
 
         // Volatile write, needs to be last
         committed = true;
     }
 
     void reset() {
+        barrier = false;
         callback = null;
-        message = null;
         completed = false;
+        message = null;
 
         // Volatile write, needs to be last
         committed = false;
     }
 
     boolean isBarrier() {
-        return message instanceof BarrierInput;
+        return barrier;
     }
 
     boolean isCommitted() {
@@ -51,12 +55,14 @@ final class OutboundQueueEntry {
         return completed;
     }
 
-    OfHeader getMessage() {
-        return message;
+    OfHeader takeMessage() {
+        final OfHeader ret = message;
+        message = null;
+        return ret;
     }
 
     boolean complete(final OfHeader response) {
-        Preconditions.checkState(!completed, "Attempted to complete a completed message %s with response %s", message, response);
+        Preconditions.checkState(!completed, "Attempted to complete a completed message with response %s", response);
 
         // Multipart requests are special, we have to look at them to see
         // if there is something outstanding and adjust ourselves accordingly
@@ -71,20 +77,24 @@ final class OutboundQueueEntry {
         completed = reallyComplete;
         if (callback != null) {
             callback.onSuccess(response);
+            if (reallyComplete) {
+                // We will not need the callback anymore, make sure it can be GC'd
+                callback = null;
+            }
         }
         LOG.debug("Entry {} completed {} with response {}", this, completed, response);
         return reallyComplete;
     }
 
-    void fail(final Throwable cause) {
+    void fail(final OutboundQueueException cause) {
         if (!completed) {
             completed = true;
             if (callback != null) {
                 callback.onFailure(cause);
+                callback = null;
             }
         } else {
-            LOG.warn("Ignoring failure {} for completed message {}", cause, message);
+            LOG.warn("Ignoring failure {} for completed message", cause);
         }
     }
-
 }