BUG-1455: improve thread safety/performance
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperHarvester.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.queue;
9
10 import java.util.Collection;
11
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14
15 /**
16  * @param <IN>
17  *
18  */
19 public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
20     private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
21
22     private final Collection<QueueKeeper<IN>> messageSources;
23     private final Enqueuer<QueueItem<IN>> enqueuer;
24     private final Object harvestLock = new Object();
25     private volatile boolean finishing = false;
26     private volatile boolean wakeMe = false;
27
28     /**
29      * @param enqueuer
30      * @param messageSources
31      * @param harvestLock
32      */
33     public QueueKeeperHarvester(final Enqueuer<QueueItem<IN>> enqueuer,
34             final Collection<QueueKeeper<IN>> messageSources) {
35         this.enqueuer = enqueuer;
36         this.messageSources = messageSources;
37     }
38
39     @Override
40     public void run() {
41         while (!finishing) {
42             boolean starving = true;
43             for (QueueKeeper<IN> source : messageSources) {
44                 QueueItem<IN> qItem = source.poll();
45                 if (qItem != null) {
46                     starving = false;
47                     enqueuer.enqueueQueueItem(qItem);
48                 }
49             }
50
51             if (starving) {
52                 LOG.trace("messageHarvester is about to make a starve sleep");
53                 synchronized (harvestLock) {
54                     wakeMe = true;
55                     try {
56                         this.harvestLock.wait();
57                         LOG.trace("messageHarvester is waking up from a starve sleep");
58                     } catch (InterruptedException e) {
59                         LOG.warn("message harvester has been interrupted during starve sleep", e);
60                     } finally {
61                         wakeMe = false;
62                     }
63                 }
64             }
65         }
66     }
67
68     /**
69      * finish harvester
70      */
71     public void shutdown() {
72         this.finishing = true;
73         ping();
74     }
75
76     @Override
77     public void ping() {
78         if (wakeMe) {
79             synchronized (harvestLock) {
80                 // Might've raced while waiting for lock, so need to recheck
81                 if (wakeMe) {
82                     LOG.debug("pinging message harvester in starve status");
83                     harvestLock.notify();
84                 }
85             }
86         }
87     }
88 }