Fix race condition when registering services
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
index b63517b8add5d90a4df4aecd386c63767fd47e50..3ab38cc41f6db686b273864801c238f3cb36eb11 100644 (file)
@@ -36,6 +36,7 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
@@ -97,7 +98,7 @@ IInventoryShimExternalListener, CommandProvider {
     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
     // Per port sampled (every portStatsPeriod) transmit rate
     private Map<Long, Map<Short, TxRates>> txRates;
-    private Set<IOFStatisticsListener> statisticsListeners;
+    private Set<IOFStatisticsListener> statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
 
     /**
      * The object containing the latest factoredSamples tx rate samples for a
@@ -155,6 +156,32 @@ IInventoryShimExternalListener, CommandProvider {
         }
     }
 
+    private short getStatsQueueSize() {
+        String statsQueueSizeStr = System.getProperty("of.statsQueueSize");
+        short statsQueueSize = INITIAL_SIZE;
+        if (statsQueueSizeStr != null) {
+            try {
+                statsQueueSize = Short.parseShort(statsQueueSizeStr);
+                if (statsQueueSize <= 0) {
+                    statsQueueSize = INITIAL_SIZE;
+                }
+            } catch (Exception e) {
+            }
+        }
+        return statsQueueSize;
+    }
+
+    IPluginOutConnectionService connectionPluginOutService;
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionPluginOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionPluginOutService == s) {
+            connectionPluginOutService = null;
+        }
+    }
+
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
@@ -166,17 +193,16 @@ IInventoryShimExternalListener, CommandProvider {
         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
         dummyList = new ArrayList<OFStatistics>(1);
+        pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(getStatsQueueSize());
         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
-        pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(INITIAL_SIZE);
         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
         txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
-        statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
 
         configStatsPollIntervals();
 
         // Initialize managed timers
-        statisticsTimer = new Timer();
+        statisticsTimer = new Timer("Statistics Timer Ticks");
         statisticsTimerTask = new TimerTask() {
             @Override
             public void run() {
@@ -195,6 +221,7 @@ IInventoryShimExternalListener, CommandProvider {
                     } catch (InterruptedException e) {
                         log.warn("Flow Statistics Collector thread "
                                 + "interrupted", e);
+                        return;
                     }
                 }
             }
@@ -210,6 +237,7 @@ IInventoryShimExternalListener, CommandProvider {
                         updatePortsTxRate(switchId);
                     } catch (InterruptedException e) {
                         log.warn("TX Rate Updater thread interrupted", e);
+                        return;
                     }
                 }
             }
@@ -223,6 +251,7 @@ IInventoryShimExternalListener, CommandProvider {
      *
      */
     void destroy() {
+        statisticsListeners.clear();
     }
 
     /**
@@ -537,11 +566,10 @@ IInventoryShimExternalListener, CommandProvider {
             values = this.v6StatsListToOFStatsList(values);
         }
 
-        if (!values.isEmpty()) { //possiblly filtered out by v6StatsListToOFStatsList()
-            for (IOFStatisticsListener l : this.statisticsListeners) {
-                l.flowStatisticsRefreshed(switchId, values);
-            }
+        for (IOFStatisticsListener l : this.statisticsListeners) {
+            l.flowStatisticsRefreshed(switchId, values);
         }
+
     }
 
     private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {