2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.queue;
10 import java.util.Queue;
11 import java.util.concurrent.ArrayBlockingQueue;
12 import java.util.concurrent.BlockingQueue;
14 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
15 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
16 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
17 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
18 import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
19 import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import com.google.common.base.Preconditions;
27 * QueueKeeper implementation based on {@link OfHeader}
29 public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
31 private static Logger LOG = LoggerFactory
32 .getLogger(QueueKeeperFairImpl.class);
34 private Queue<QueueItem<OfHeader>> queueDefault;
35 private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
36 private AutoCloseable pollRegistration;
37 private int capacity = 5000;
38 private HarvesterHandle harvesterHandle;
39 private PollableQueuesPriorityZipper<QueueItem<OfHeader>> queueZipper;
41 private WaterMarkListener waterMarkListener;
44 public void close() throws Exception {
45 Preconditions.checkNotNull(pollRegistration,
46 "pollRegistration not available");
47 pollRegistration.close();
51 public void push(OfHeader message, ConnectionConductor conductor,
52 QueueKeeper.QueueType queueType) {
53 QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor,
55 boolean enqueued = false;
59 enqueued = queueDefault.offer(qItem);
62 enqueued = queueUnordered.offer(qItem);
65 LOG.warn("unsupported queue type: [{}] -> dropping message [{}]",
66 queueType, message.getImplementedInterface());
70 harvesterHandle.ping();
72 LOG.debug("ingress throttling is use -> {}", queueType);
75 // if enqueueing fails -> message will be dropped
79 * @return the ingressQueue
82 public QueueItem<OfHeader> poll() {
83 return queueZipper.poll();
87 * @param processingRegistration
88 * the processingRegistration to set
91 public void setPollRegistration(AutoCloseable processingRegistration) {
92 this.pollRegistration = processingRegistration;
97 * the capacity of internal blocking queue
99 public void setCapacity(int capacity) {
100 this.capacity = capacity;
104 * init blocking queue
107 Preconditions.checkNotNull(waterMarkListener);
108 queueUnordered = new ArrayBlockingQueue<>(capacity);
109 queueDefault = new ArrayBlockingQueue<>(capacity);
110 WrapperQueueImpl<QueueItem<OfHeader>> wrapperQueue = new WrapperQueueImpl<>(
111 capacity, queueDefault, waterMarkListener);
112 queueZipper = new PollableQueuesPriorityZipper<>();
113 queueZipper.addSource(queueUnordered);
114 queueZipper.setPrioritizedSource(wrapperQueue);
117 public void setWaterMarkListener(WaterMarkListener waterMarkListener) {
118 this.waterMarkListener = waterMarkListener;
122 * @param harvesterHandle
124 public void setHarvesterHandle(HarvesterHandle harvesterHandle) {
125 this.harvesterHandle = harvesterHandle;