import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
private HarvesterHandle harvesterHandle;
private PollableQueuesPriorityZipper<QueueItem<OfHeader>> queueZipper;
+ private WaterMarkListener waterMarkListener;
+
@Override
public void close() throws Exception {
Preconditions.checkNotNull(pollRegistration,
*/
@Override
public QueueItem<OfHeader> poll() {
- QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
- return nextQueueItem;
+ return queueZipper.poll();
}
/**
* init blocking queue
*/
public void init() {
+ Preconditions.checkNotNull(waterMarkListener);
queueUnordered = new ArrayBlockingQueue<>(capacity);
queueDefault = new ArrayBlockingQueue<>(capacity);
+ WrapperQueueImpl<QueueItem<OfHeader>> wrapperQueue = new WrapperQueueImpl<>(
+ capacity, queueDefault, waterMarkListener);
queueZipper = new PollableQueuesPriorityZipper<>();
queueZipper.addSource(queueUnordered);
- queueZipper.setPrioritizedSource(queueDefault);
+ queueZipper.setPrioritizedSource(wrapperQueue);
+ }
+
+ public void setWaterMarkListener(WaterMarkListener waterMarkListener) {
+ this.waterMarkListener = waterMarkListener;
}
/**