2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.queue;
10 import java.util.Queue;
11 import java.util.concurrent.ArrayBlockingQueue;
12 import java.util.concurrent.BlockingQueue;
14 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
15 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
16 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
17 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
18 import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesZipper;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 import com.google.common.base.Preconditions;
26 * QueueKeeper implementation based on {@link OfHeader}
28 public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
30 private static Logger LOG = LoggerFactory
31 .getLogger(QueueKeeperFairImpl.class);
33 private Queue<QueueItem<OfHeader>> queueDefault;
34 private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
35 private AutoCloseable pollRegistration;
36 private int capacity = 5000;
37 private HarvesterHandle harvesterHandle;
38 private PollableQueuesZipper<QueueItem<OfHeader>> queueZipper;
41 public void close() throws Exception {
42 Preconditions.checkNotNull(pollRegistration, "pollRegistration not available");
43 pollRegistration.close();
49 ConnectionConductor conductor,
50 QueueKeeper.QueueType queueType) {
51 QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType);
52 boolean enqueued = false;
56 enqueued = queueDefault.offer(qItem);
59 enqueued = queueUnordered.offer(qItem);
62 LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface());
66 harvesterHandle.ping();
68 LOG.debug("ingress throttling is use -> {}", queueType);
71 // if enqueueing fails -> message will be dropped
75 * @return the ingressQueue
78 public QueueItem<OfHeader> poll() {
79 QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
84 * @param processingRegistration the processingRegistration to set
87 public void setPollRegistration(AutoCloseable processingRegistration) {
88 this.pollRegistration = processingRegistration;
92 * @param capacity the capacity of internal blocking queue
94 public void setCapacity(int capacity) {
95 this.capacity = capacity;
102 queueUnordered = new ArrayBlockingQueue<>(capacity);
103 queueDefault = new ArrayBlockingQueue<>(capacity);
104 queueZipper = new PollableQueuesZipper<>();
105 queueZipper.addSource(queueDefault);
106 queueZipper.addSource(queueUnordered);
110 * @param harvesterHandle
112 public void setHarvesterHandle(HarvesterHandle harvesterHandle) {
113 this.harvesterHandle = harvesterHandle;