ee74527d328d86390f4010452c25f78f779bcab3
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperFairImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * 
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Queue;
11 import java.util.concurrent.ArrayBlockingQueue;
12 import java.util.concurrent.BlockingQueue;
13
14 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
15 import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesZipper;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import com.google.common.base.Preconditions;
21
22 /**
23  * QueueKeeper implementation based on {@link OfHeader} 
24  */
25 public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
26
27     private static Logger LOG = LoggerFactory
28             .getLogger(QueueKeeperFairImpl.class);
29     
30     private Queue<QueueItem<OfHeader>> queueDefault;
31     private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
32     private AutoCloseable pollRegistration;
33     private int capacity = 5000;
34     private HarvesterHandle harvesterHandle;
35     private PollableQueuesZipper<QueueItem<OfHeader>> queueZipper;
36
37     @Override
38     public void close() throws Exception {
39         Preconditions.checkNotNull(pollRegistration, "pollRegistration not available");
40         pollRegistration.close();
41     }
42
43     @Override
44     public void push(
45             OfHeader message,
46             ConnectionConductor conductor,
47             QueueKeeper.QueueType queueType) {
48         QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType);
49         boolean enqueued = false;
50         
51         switch (queueType) {
52         case DEFAULT:
53             enqueued = queueDefault.offer(qItem);
54             break;
55         case UNORDERED:
56             enqueued = queueUnordered.offer(qItem);
57             break;
58         default:
59             LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface());
60         }
61         
62         if (enqueued) {
63             harvesterHandle.ping();
64         } else {
65             LOG.debug("ingress throttling is use -> {}", queueType);
66         }
67         
68         // if enqueueing fails -> message will be dropped 
69     }
70     
71     /**
72      * @return the ingressQueue
73      */
74     @Override
75     public QueueItem<OfHeader> poll() {
76         QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
77         return nextQueueItem;
78     }
79
80     /**
81      * @param processingRegistration the processingRegistration to set
82      */
83     @Override
84     public void setPollRegistration(AutoCloseable processingRegistration) {
85         this.pollRegistration = processingRegistration;
86     }
87     
88     /**
89      * @param capacity the capacity of internal blocking queue
90      */
91     public void setCapacity(int capacity) {
92         this.capacity = capacity;
93     }
94     
95     /**
96      * init blocking queue
97      */
98     public void init() {
99         queueUnordered = new ArrayBlockingQueue<>(capacity);
100         queueDefault = new ArrayBlockingQueue<>(capacity);
101         queueZipper = new PollableQueuesZipper<>();
102         queueZipper.addSource(queueDefault);
103         queueZipper.addSource(queueUnordered);
104     }
105
106     /**
107      * @param harvesterHandle
108      */
109     public void setHarvesterHandle(HarvesterHandle harvesterHandle) {
110         this.harvesterHandle = harvesterHandle;
111     }
112 }