Fix Jdk8 compatibility
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperHarvester.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11
12 import org.opendaylight.openflowplugin.api.openflow.md.queue.Enqueuer;
13 import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
14 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
15 import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  * @param <IN>
21  *
22  */
23 public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
24     private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
25
26     private final Collection<QueueKeeper<IN>> messageSources;
27     private final Enqueuer<QueueItem<IN>> enqueuer;
28     private final Object harvestLock = new Object();
29     private volatile boolean finishing = false;
30     private volatile boolean wakeMe = false;
31
32     /**
33      * @param enqueuer
34      * @param messageSources
35      */
36     public QueueKeeperHarvester(final Enqueuer<QueueItem<IN>> enqueuer,
37             final Collection<QueueKeeper<IN>> messageSources) {
38         this.enqueuer = enqueuer;
39         this.messageSources = messageSources;
40     }
41
42     @Override
43     public void run() {
44         while (!finishing) {
45             boolean starving = true;
46             for (QueueKeeper<IN> source : messageSources) {
47                 QueueItem<IN> qItem = source.poll();
48                 if (qItem != null) {
49                     starving = false;
50                     enqueuer.enqueueQueueItem(qItem);
51                 }
52             }
53
54             if (starving) {
55                 LOG.trace("messageHarvester is about to make a starve sleep");
56                 synchronized (harvestLock) {
57                     wakeMe = true;
58                     try {
59                         this.harvestLock.wait();
60                         LOG.trace("messageHarvester is waking up from a starve sleep");
61                     } catch (InterruptedException e) {
62                         LOG.warn("message harvester has been interrupted during starve sleep", e);
63                     } finally {
64                         wakeMe = false;
65                     }
66                 }
67             }
68         }
69     }
70
71     /**
72      * finish harvester
73      */
74     public void shutdown() {
75         this.finishing = true;
76         ping();
77     }
78
79     @Override
80     public void ping() {
81         if (wakeMe) {
82             synchronized (harvestLock) {
83                 // Might've raced while waiting for lock, so need to recheck
84                 if (wakeMe) {
85                     LOG.debug("pinging message harvester in starve status");
86                     harvestLock.notify();
87                 }
88             }
89         }
90     }
91 }