Bug 1764 - moved Session related interfaces to openflowplugin-api
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperHarvester.java
index e362aeaa6d430e71614db17736a65a6b678bb518..6a25786d749698a83bf25449785a7c551de477ab 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * 
+ *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
@@ -9,41 +9,41 @@ package org.opendaylight.openflowplugin.openflow.md.queue;
 
 import java.util.Collection;
 
+import org.opendaylight.openflowplugin.api.openflow.md.queue.Enqueuer;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.HarvesterHandle;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueItem;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @param <IN> 
- * 
+ * @param <IN>
+ *
  */
 public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
     private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
 
-    private Enqueuer<QueueItem<IN>> enqueuer;
-    private Collection<QueueKeeper<IN>> messageSources;
+    private final Collection<QueueKeeper<IN>> messageSources;
+    private final Enqueuer<QueueItem<IN>> enqueuer;
+    private final Object harvestLock = new Object();
+    private volatile boolean finishing = false;
+    private volatile boolean wakeMe = false;
 
-    private boolean finishing = false;
-    private boolean starving;
-    
-    private Object harvestLock;
-
-    
     /**
      * @param enqueuer
      * @param messageSources
-     * @param harvestLock 
+     * @param harvestLock
      */
-    public QueueKeeperHarvester(Enqueuer<QueueItem<IN>> enqueuer,
-            Collection<QueueKeeper<IN>> messageSources) {
+    public QueueKeeperHarvester(final Enqueuer<QueueItem<IN>> enqueuer,
+            final Collection<QueueKeeper<IN>> messageSources) {
         this.enqueuer = enqueuer;
         this.messageSources = messageSources;
-        harvestLock = new Object();
     }
 
     @Override
     public void run() {
-        while (! finishing ) {
-            starving = true;
+        while (!finishing) {
+            boolean starving = true;
             for (QueueKeeper<IN> source : messageSources) {
                 QueueItem<IN> qItem = source.poll();
                 if (qItem != null) {
@@ -51,40 +51,42 @@ public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
                     enqueuer.enqueueQueueItem(qItem);
                 }
             }
-            
+
             if (starving) {
+                LOG.trace("messageHarvester is about to make a starve sleep");
                 synchronized (harvestLock) {
+                    wakeMe = true;
                     try {
-                        if (starving) {
-                            LOG.trace("messageHarvester is about to make a starve sleep");
-                            harvestLock.wait();
-                            LOG.trace("messageHarvester is waking up from a starve sleep");
-                        }
+                        this.harvestLock.wait();
+                        LOG.trace("messageHarvester is waking up from a starve sleep");
                     } catch (InterruptedException e) {
                         LOG.warn("message harvester has been interrupted during starve sleep", e);
+                    } finally {
+                        wakeMe = false;
                     }
                 }
             }
         }
     }
-    
+
     /**
      * finish harvester
      */
     public void shutdown() {
         this.finishing = true;
+        ping();
     }
-    
+
     @Override
     public void ping() {
-        if (starving) {
-            LOG.debug("pinging message harvester in starve status");
+        if (wakeMe) {
             synchronized (harvestLock) {
-                if (starving) {
-                    starving = false;
+                // Might've raced while waiting for lock, so need to recheck
+                if (wakeMe) {
+                    LOG.debug("pinging message harvester in starve status");
                     harvestLock.notify();
                 }
             }
         }
     }
-}
+}
\ No newline at end of file