BUG-1075: ingress back pressure
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperHarvester.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * 
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14
15 /**
16  * @param <IN> 
17  * 
18  */
19 public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
20     private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
21
22     private Enqueuer<QueueItem<IN>> enqueuer;
23     private Collection<QueueKeeper<IN>> messageSources;
24
25     private boolean finishing = false;
26     private boolean starving;
27     
28     private Object harvestLock;
29
30     
31     /**
32      * @param enqueuer
33      * @param messageSources
34      * @param harvestLock 
35      */
36     public QueueKeeperHarvester(Enqueuer<QueueItem<IN>> enqueuer,
37             Collection<QueueKeeper<IN>> messageSources) {
38         this.enqueuer = enqueuer;
39         this.messageSources = messageSources;
40         harvestLock = new Object();
41     }
42
43     @Override
44     public void run() {
45         while (! finishing ) {
46             starving = true;
47             for (QueueKeeper<IN> source : messageSources) {
48                 QueueItem<IN> qItem = source.poll();
49                 if (qItem != null) {
50                     starving = false;
51                     enqueuer.enqueueQueueItem(qItem);
52                 }
53             }
54             
55             if (starving) {
56                 synchronized (harvestLock) {
57                     try {
58                         if (starving) {
59                             LOG.trace("messageHarvester is about to make a starve sleep");
60                             harvestLock.wait();
61                             LOG.trace("messageHarvester is waking up from a starve sleep");
62                         }
63                     } catch (InterruptedException e) {
64                         LOG.warn("message harvester has been interrupted during starve sleep", e);
65                     }
66                 }
67             }
68         }
69     }
70     
71     /**
72      * finish harvester
73      */
74     public void shutdown() {
75         this.finishing = true;
76     }
77     
78     @Override
79     public void ping() {
80         if (starving) {
81             LOG.debug("pinging message harvester in starve status");
82             synchronized (harvestLock) {
83                 if (starving) {
84                     starving = false;
85                     harvestLock.notify();
86                 }
87             }
88         }
89     }
90 }