Bug 1764 - moved Session related interfaces to openflowplugin-api
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperHarvester.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11
12 import org.opendaylight.openflowplugin.api.openflow.md.queue.Enqueuer;
13 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
14 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
15 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  * @param <IN>
21  *
22  */
23 public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
24     private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
25
26     private final Collection<QueueKeeper<IN>> messageSources;
27     private final Enqueuer<QueueItem<IN>> enqueuer;
28     private final Object harvestLock = new Object();
29     private volatile boolean finishing = false;
30     private volatile boolean wakeMe = false;
31
32     /**
33      * @param enqueuer
34      * @param messageSources
35      * @param harvestLock
36      */
37     public QueueKeeperHarvester(final Enqueuer<QueueItem<IN>> enqueuer,
38             final Collection<QueueKeeper<IN>> messageSources) {
39         this.enqueuer = enqueuer;
40         this.messageSources = messageSources;
41     }
42
43     @Override
44     public void run() {
45         while (!finishing) {
46             boolean starving = true;
47             for (QueueKeeper<IN> source : messageSources) {
48                 QueueItem<IN> qItem = source.poll();
49                 if (qItem != null) {
50                     starving = false;
51                     enqueuer.enqueueQueueItem(qItem);
52                 }
53             }
54
55             if (starving) {
56                 LOG.trace("messageHarvester is about to make a starve sleep");
57                 synchronized (harvestLock) {
58                     wakeMe = true;
59                     try {
60                         this.harvestLock.wait();
61                         LOG.trace("messageHarvester is waking up from a starve sleep");
62                     } catch (InterruptedException e) {
63                         LOG.warn("message harvester has been interrupted during starve sleep", e);
64                     } finally {
65                         wakeMe = false;
66                     }
67                 }
68             }
69         }
70     }
71
72     /**
73      * finish harvester
74      */
75     public void shutdown() {
76         this.finishing = true;
77         ping();
78     }
79
80     @Override
81     public void ping() {
82         if (wakeMe) {
83             synchronized (harvestLock) {
84                 // Might've raced while waiting for lock, so need to recheck
85                 if (wakeMe) {
86                     LOG.debug("pinging message harvester in starve status");
87                     harvestLock.notify();
88                 }
89             }
90         }
91     }
92 }