bug 2446 - High priority (control) queue stop reading from channel if is full
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperFactory.java
index 0c856cff298e2f502090d6a4815f0f9529dbea9a..1143afa1def797c68a85f1f44f02b69eba2c5222 100644 (file)
@@ -9,37 +9,50 @@ package org.opendaylight.openflowplugin.openflow.md.queue;
 
 import org.opendaylight.openflowplugin.api.openflow.md.queue.MessageSourcePollRegistrator;
 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 
 /**
- * factory for {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} implementations
+ * factory for
+ * {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper}
+ * implementations
  */
 public abstract class QueueKeeperFactory {
-    
+
     /**
-     * @param sourceRegistrator 
-     * @param capacity blocking queue capacity
-     * @return fair reading implementation of {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper} (not registered = not started yet)
+     * @param sourceRegistrator
+     * @param capacity
+     *            blocking queue capacity
+     * @param waterMarkListener
+     * @return fair reading implementation of
+     *         {@link org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper}
+     *         (not registered = not started yet)
      */
     public static QueueKeeper<OfHeader> createFairQueueKeeper(
-            MessageSourcePollRegistrator<QueueKeeper<OfHeader>> sourceRegistrator, int capacity) {
+            MessageSourcePollRegistrator<QueueKeeper<OfHeader>> sourceRegistrator,
+            int capacity, WaterMarkListener waterMarkListener) {
         QueueKeeperFairImpl queueKeeper = new QueueKeeperFairImpl();
         queueKeeper.setCapacity(capacity);
         queueKeeper.setHarvesterHandle(sourceRegistrator.getHarvesterHandle());
+        queueKeeper.setWaterMarkListener(waterMarkListener);
         queueKeeper.init();
-        
+
         return queueKeeper;
     }
 
     /**
-     * register queue by harvester, start processing it. Use {@link QueueKeeperFairImpl#close()} to kill the queue and stop processing. 
+     * register queue by harvester, start processing it. Use
+     * {@link QueueKeeperFairImpl#close()} to kill the queue and stop
+     * processing.
+     * 
      * @param sourceRegistrator
      * @param queueKeeper
      */
     public static <V> void plugQueue(
             MessageSourcePollRegistrator<QueueKeeper<V>> sourceRegistrator,
             QueueKeeper<V> queueKeeper) {
-        AutoCloseable registration = sourceRegistrator.registerMessageSource(queueKeeper);
+        AutoCloseable registration = sourceRegistrator
+                .registerMessageSource(queueKeeper);
         queueKeeper.setPollRegistration(registration);
         sourceRegistrator.getHarvesterHandle().ping();
     }