BUG-1455: improve thread safety/performance 08/9508/5
authorVaclav Demcak <vdemcak@cisco.com>
Mon, 11 Aug 2014 14:49:03 +0000 (16:49 +0200)
committermichal rehak <mirehak@cisco.com>
Tue, 12 Aug 2014 17:12:30 +0000 (17:12 +0000)
There were couple of things wrong with this code: flipping 'starving'
field needlessly, which could end up in needless calls to notify().
Split that field into local state and volatile status indicator.

finished is not volatile, so the checks can actually get removed at
runtime, breaking the shutdown() functionality. Also an explicit ping()
is needed to wake a starving harvester.

Change-Id: I58b46a3f2e593e3873c47c30d2fe0d22d4304e8f
Signed-off-by: Robert Varga <rovarga@cisco.com>
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperHarvester.java

index e362aeaa6d430e71614db17736a65a6b678bb518..2b634bcb57e6b955f030ddf7b429e865ea087498 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * 
+ *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
@@ -13,37 +13,33 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @param <IN> 
- * 
+ * @param <IN>
+ *
  */
 public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
     private static Logger LOG = LoggerFactory.getLogger(QueueKeeperHarvester.class);
 
-    private Enqueuer<QueueItem<IN>> enqueuer;
-    private Collection<QueueKeeper<IN>> messageSources;
+    private final Collection<QueueKeeper<IN>> messageSources;
+    private final Enqueuer<QueueItem<IN>> enqueuer;
+    private final Object harvestLock = new Object();
+    private volatile boolean finishing = false;
+    private volatile boolean wakeMe = false;
 
-    private boolean finishing = false;
-    private boolean starving;
-    
-    private Object harvestLock;
-
-    
     /**
      * @param enqueuer
      * @param messageSources
-     * @param harvestLock 
+     * @param harvestLock
      */
-    public QueueKeeperHarvester(Enqueuer<QueueItem<IN>> enqueuer,
-            Collection<QueueKeeper<IN>> messageSources) {
+    public QueueKeeperHarvester(final Enqueuer<QueueItem<IN>> enqueuer,
+            final Collection<QueueKeeper<IN>> messageSources) {
         this.enqueuer = enqueuer;
         this.messageSources = messageSources;
-        harvestLock = new Object();
     }
 
     @Override
     public void run() {
-        while (! finishing ) {
-            starving = true;
+        while (!finishing) {
+            boolean starving = true;
             for (QueueKeeper<IN> source : messageSources) {
                 QueueItem<IN> qItem = source.poll();
                 if (qItem != null) {
@@ -51,40 +47,42 @@ public class QueueKeeperHarvester<IN> implements Runnable, HarvesterHandle {
                     enqueuer.enqueueQueueItem(qItem);
                 }
             }
-            
+
             if (starving) {
+                LOG.trace("messageHarvester is about to make a starve sleep");
                 synchronized (harvestLock) {
+                    wakeMe = true;
                     try {
-                        if (starving) {
-                            LOG.trace("messageHarvester is about to make a starve sleep");
-                            harvestLock.wait();
-                            LOG.trace("messageHarvester is waking up from a starve sleep");
-                        }
+                        this.harvestLock.wait();
+                        LOG.trace("messageHarvester is waking up from a starve sleep");
                     } catch (InterruptedException e) {
                         LOG.warn("message harvester has been interrupted during starve sleep", e);
+                    } finally {
+                        wakeMe = false;
                     }
                 }
             }
         }
     }
-    
+
     /**
      * finish harvester
      */
     public void shutdown() {
         this.finishing = true;
+        ping();
     }
-    
+
     @Override
     public void ping() {
-        if (starving) {
-            LOG.debug("pinging message harvester in starve status");
+        if (wakeMe) {
             synchronized (harvestLock) {
-                if (starving) {
-                    starving = false;
+                // Might've raced while waiting for lock, so need to recheck
+                if (wakeMe) {
+                    LOG.debug("pinging message harvester in starve status");
                     harvestLock.notify();
                 }
             }
         }
     }
-}
+}
\ No newline at end of file