2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.queue;
10 import java.util.Queue;
11 import java.util.concurrent.ArrayBlockingQueue;
12 import java.util.concurrent.BlockingQueue;
14 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
15 import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesZipper;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
20 import com.google.common.base.Preconditions;
23 * QueueKeeper implementation based on {@link OfHeader}
25 public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
27 private static Logger LOG = LoggerFactory
28 .getLogger(QueueKeeperFairImpl.class);
30 private Queue<QueueItem<OfHeader>> queueDefault;
31 private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
32 private AutoCloseable pollRegistration;
33 private int capacity = 5000;
34 private HarvesterHandle harvesterHandle;
35 private PollableQueuesZipper<QueueItem<OfHeader>> queueZipper;
38 public void close() throws Exception {
39 Preconditions.checkNotNull(pollRegistration, "pollRegistration not available");
40 pollRegistration.close();
46 ConnectionConductor conductor,
47 QueueKeeper.QueueType queueType) {
48 QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType);
49 boolean enqueued = false;
53 enqueued = queueDefault.offer(qItem);
56 enqueued = queueUnordered.offer(qItem);
59 LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface());
63 harvesterHandle.ping();
65 LOG.debug("ingress throttling is use -> {}", queueType);
68 // if enqueueing fails -> message will be dropped
72 * @return the ingressQueue
75 public QueueItem<OfHeader> poll() {
76 QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
81 * @param processingRegistration the processingRegistration to set
84 public void setPollRegistration(AutoCloseable processingRegistration) {
85 this.pollRegistration = processingRegistration;
89 * @param capacity the capacity of internal blocking queue
91 public void setCapacity(int capacity) {
92 this.capacity = capacity;
99 queueUnordered = new ArrayBlockingQueue<>(capacity);
100 queueDefault = new ArrayBlockingQueue<>(capacity);
101 queueZipper = new PollableQueuesZipper<>();
102 queueZipper.addSource(queueDefault);
103 queueZipper.addSource(queueUnordered);
107 * @param harvesterHandle
109 public void setHarvesterHandle(HarvesterHandle harvesterHandle) {
110 this.harvesterHandle = harvesterHandle;