BUG-1455: improve thread safety/performance
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / queue / QueueKeeperHarvester.java
index e362aeaa6d430e71614db17736a65a6b678bb518..2b634bcb57e6b955f030ddf7b429e865ea087498 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * 
+ *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
@@ -13,37 +13,33 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @param <IN> 
- * 
+ * @param <IN>
+ *
  */
 public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
     private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
 
-    private Enqueuer<QueueItem<IN>> enqueuer;
-    private Collection<QueueKeeper<IN>> messageSources;
+    private final Collection<QueueKeeper<IN>> messageSources;
+    private final Enqueuer<QueueItem<IN>> enqueuer;
+    private final Object harvestLock = new Object();
+    private volatile boolean finishing = false;
+    private volatile boolean wakeMe = false;
 
-    private boolean finishing = false;
-    private boolean starving;
-    
-    private Object harvestLock;
-
-    
     /**
      * @param enqueuer
      * @param messageSources
-     * @param harvestLock 
+     * @param harvestLock
      */
-    public QueueKeeperHarvester(Enqueuer<QueueItem<IN>> enqueuer,
-            Collection<QueueKeeper<IN>> messageSources) {
+    public QueueKeeperHarvester(final Enqueuer<QueueItem<IN>> enqueuer,
+            final Collection<QueueKeeper<IN>> messageSources) {
         this.enqueuer = enqueuer;
         this.messageSources = messageSources;
-        harvestLock = new Object();
     }
 
     @Override
     public void run() {
-        while (! finishing ) {
-            starving = true;
+        while (!finishing) {
+            boolean starving = true;
             for (QueueKeeper<IN> source : messageSources) {
                 QueueItem<IN> qItem = source.poll();
                 if (qItem != null) {
@@ -51,40 +47,42 @@ public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
                     enqueuer.enqueueQueueItem(qItem);
                 }
             }
-            
+
             if (starving) {
+                LOG.trace("messageHarvester is about to make a starve sleep");
                 synchronized (harvestLock) {
+                    wakeMe = true;
                     try {
-                        if (starving) {
-                            LOG.trace("messageHarvester is about to make a starve sleep");
-                            harvestLock.wait();
-                            LOG.trace("messageHarvester is waking up from a starve sleep");
-                        }
+                        this.harvestLock.wait();
+                        LOG.trace("messageHarvester is waking up from a starve sleep");
                     } catch (InterruptedException e) {
                         LOG.warn("message harvester has been interrupted during starve sleep", e);
+                    } finally {
+                        wakeMe = false;
                     }
                 }
             }
         }
     }
-    
+
     /**
      * finish harvester
      */
     public void shutdown() {
         this.finishing = true;
+        ping();
     }
-    
+
     @Override
     public void ping() {
-        if (starving) {
-            LOG.debug("pinging message harvester in starve status");
+        if (wakeMe) {
             synchronized (harvestLock) {
-                if (starving) {
-                    starving = false;
+                // Might've raced while waiting for lock, so need to recheck
+                if (wakeMe) {
+                    LOG.debug("pinging message harvester in starve status");
                     harvestLock.notify();
                 }
             }
         }
     }
-}
+}
\ No newline at end of file