bug 2446 - High priority (control) queue stop reading from channel if is full
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperFairImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * 
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Queue;
11 import java.util.concurrent.ArrayBlockingQueue;
12 import java.util.concurrent.BlockingQueue;
13
14 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
15 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
16 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
17 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
18 import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener;
19 import org.opendaylight.openflowplugin.api.openflow.md.util.PollableQueuesPriorityZipper;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import com.google.common.base.Preconditions;
25
26 /**
27  * QueueKeeper implementation based on {@link OfHeader}
28  */
29 public class QueueKeeperFairImpl implements QueueKeeper<OfHeader> {
30
31     private static Logger LOG = LoggerFactory
32             .getLogger(QueueKeeperFairImpl.class);
33
34     private Queue<QueueItem<OfHeader>> queueDefault;
35     private BlockingQueue<QueueItem<OfHeader>> queueUnordered;
36     private AutoCloseable pollRegistration;
37     private int capacity = 5000;
38     private HarvesterHandle harvesterHandle;
39     private PollableQueuesPriorityZipper<QueueItem<OfHeader>> queueZipper;
40
41     private WaterMarkListener waterMarkListener;
42
43     @Override
44     public void close() throws Exception {
45         Preconditions.checkNotNull(pollRegistration,
46                 "pollRegistration not available");
47         pollRegistration.close();
48     }
49
50     @Override
51     public void push(OfHeader message, ConnectionConductor conductor,
52             QueueKeeper.QueueType queueType) {
53         QueueItemOFImpl qItem = new QueueItemOFImpl(message, conductor,
54                 queueType);
55         boolean enqueued = false;
56
57         switch (queueType) {
58         case DEFAULT:
59             enqueued = queueDefault.offer(qItem);
60             break;
61         case UNORDERED:
62             enqueued = queueUnordered.offer(qItem);
63             break;
64         default:
65             LOG.warn("unsupported queue type: [{}] -> dropping message [{}]",
66                     queueType, message.getImplementedInterface());
67         }
68
69         if (enqueued) {
70             harvesterHandle.ping();
71         } else {
72             LOG.debug("ingress throttling is use -> {}", queueType);
73         }
74
75         // if enqueueing fails -> message will be dropped
76     }
77
78     /**
79      * @return the ingressQueue
80      */
81     @Override
82     public QueueItem<OfHeader> poll() {
83         return queueZipper.poll();
84     }
85
86     /**
87      * @param processingRegistration
88      *            the processingRegistration to set
89      */
90     @Override
91     public void setPollRegistration(AutoCloseable processingRegistration) {
92         this.pollRegistration = processingRegistration;
93     }
94
95     /**
96      * @param capacity
97      *            the capacity of internal blocking queue
98      */
99     public void setCapacity(int capacity) {
100         this.capacity = capacity;
101     }
102
103     /**
104      * init blocking queue
105      */
106     public void init() {
107         Preconditions.checkNotNull(waterMarkListener);
108         queueUnordered = new ArrayBlockingQueue<>(capacity);
109         queueDefault = new ArrayBlockingQueue<>(capacity);
110         WrapperQueueImpl<QueueItem<OfHeader>> wrapperQueue = new WrapperQueueImpl<>(
111                 capacity, queueDefault, waterMarkListener);
112         queueZipper = new PollableQueuesPriorityZipper<>();
113         queueZipper.addSource(queueUnordered);
114         queueZipper.setPrioritizedSource(wrapperQueue);
115     }
116
117     public void setWaterMarkListener(WaterMarkListener waterMarkListener) {
118         this.waterMarkListener = waterMarkListener;
119     }
120
121     /**
122      * @param harvesterHandle
123      */
124     public void setHarvesterHandle(HarvesterHandle harvesterHandle) {
125         this.harvesterHandle = harvesterHandle;
126     }
127 }