bug 2446 - High priority (control) queue stop reading from channel if is full
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperFairImpl.java
index dbf7d4d2d330ef9c44092cfbe66d09b9230d144f..b24ba9f74969bd33b7c6e84cea017955ced42655 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
 import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
@@ -37,6 +38,8 @@ public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
     private HarvesterHandle harvesterHandle;
     private PollableQueuesPriorityZipper<QueueItem<OfHeader>> queueZipper;
 
+    private WaterMarkListener waterMarkListener;
+
     @Override
     public void close() throws Exception {
         Preconditions.checkNotNull(pollRegistration,
@@ -77,8 +80,7 @@ public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
      */
     @Override
     public QueueItem<OfHeader> poll() {
-        QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
-        return nextQueueItem;
+        return queueZipper.poll();
     }
 
     /**
@@ -102,11 +104,18 @@ public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
      * init blocking queue
      */
     public void init() {
+        Preconditions.checkNotNull(waterMarkListener);
         queueUnordered = new ArrayBlockingQueue<>(capacity);
         queueDefault = new ArrayBlockingQueue<>(capacity);
+        WrapperQueueImpl<QueueItem<OfHeader>> wrapperQueue = new WrapperQueueImpl<>(
+                capacity, queueDefault, waterMarkListener);
         queueZipper = new PollableQueuesPriorityZipper<>();
         queueZipper.addSource(queueUnordered);
-        queueZipper.setPrioritizedSource(queueDefault);
+        queueZipper.setPrioritizedSource(wrapperQueue);
+    }
+
+    public void setWaterMarkListener(WaterMarkListener waterMarkListener) {
+        this.waterMarkListener = waterMarkListener;
     }
 
     /**