04b708416c0911ec82b8f70520b7b6f3f5106928
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperFairImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * 
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Queue;
11 import java.util.concurrent.ArrayBlockingQueue;
12 import java.util.concurrent.BlockingQueue;
13
14 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
15 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
16 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
17 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
18 import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesZipper;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import com.google.common.base.Preconditions;
24
25 /**
26  * QueueKeeper implementation based on {@link OfHeader} 
27  */
28 public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
29
30     private static Logger LOG = LoggerFactory
31             .getLogger(QueueKeeperFairImpl.class);
32     
33     private Queue<QueueItem<OfHeader>> queueDefault;
34     private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
35     private AutoCloseable pollRegistration;
36     private int capacity = 5000;
37     private HarvesterHandle harvesterHandle;
38     private PollableQueuesZipper<QueueItem<OfHeader>> queueZipper;
39
40     @Override
41     public void close() throws Exception {
42         Preconditions.checkNotNull(pollRegistration, "pollRegistration not available");
43         pollRegistration.close();
44     }
45
46     @Override
47     public void push(
48             OfHeader message,
49             ConnectionConductor conductor,
50             QueueKeeper.QueueType queueType) {
51         QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor, queueType);
52         boolean enqueued = false;
53         
54         switch (queueType) {
55         case DEFAULT:
56             enqueued = queueDefault.offer(qItem);
57             break;
58         case UNORDERED:
59             enqueued = queueUnordered.offer(qItem);
60             break;
61         default:
62             LOG.warn("unsupported queue type: [{}] -> dropping message [{}]", queueType, message.getImplementedInterface());
63         }
64         
65         if (enqueued) {
66             harvesterHandle.ping();
67         } else {
68             LOG.debug("ingress throttling is use -> {}", queueType);
69         }
70         
71         // if enqueueing fails -> message will be dropped 
72     }
73     
74     /**
75      * @return the ingressQueue
76      */
77     @Override
78     public QueueItem<OfHeader> poll() {
79         QueueItem<OfHeader> nextQueueItem = queueZipper.poll();
80         return nextQueueItem;
81     }
82
83     /**
84      * @param processingRegistration the processingRegistration to set
85      */
86     @Override
87     public void setPollRegistration(AutoCloseable processingRegistration) {
88         this.pollRegistration = processingRegistration;
89     }
90     
91     /**
92      * @param capacity the capacity of internal blocking queue
93      */
94     public void setCapacity(int capacity) {
95         this.capacity = capacity;
96     }
97     
98     /**
99      * init blocking queue
100      */
101     public void init() {
102         queueUnordered = new ArrayBlockingQueue<>(capacity);
103         queueDefault = new ArrayBlockingQueue<>(capacity);
104         queueZipper = new PollableQueuesZipper<>();
105         queueZipper.addSource(queueDefault);
106         queueZipper.addSource(queueUnordered);
107     }
108
109     /**
110      * @param harvesterHandle
111      */
112     public void setHarvesterHandle(HarvesterHandle harvesterHandle) {
113         this.harvesterHandle = harvesterHandle;
114     }
115 }