Bug fix: flow statistics are not notified if empty
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
index 78fddc773637c8aefb338b729d4e416e66e85441..3c02c1762875eceb1ba226f8b333584af9b852bf 100644 (file)
@@ -13,7 +13,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -23,14 +22,15 @@ import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
-import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
@@ -46,7 +46,6 @@ import org.openflow.protocol.OFMatch;
 import org.openflow.protocol.OFPort;
 import org.openflow.protocol.OFStatisticsRequest;
 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
-import org.openflow.protocol.statistics.OFDescriptionStatistics;
 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
 import org.openflow.protocol.statistics.OFPortStatisticsReply;
@@ -69,18 +68,17 @@ import org.slf4j.LoggerFactory;
  */
 public class OFStatisticsManager implements IOFStatisticsManager,
 IInventoryShimExternalListener, CommandProvider {
-    private static final Logger log = LoggerFactory
-            .getLogger(OFStatisticsManager.class);
-    private static final int initialSize = 64;
-    private static final long flowStatsPeriod = 10000;
-    private static final long descriptionStatsPeriod = 60000;
-    private static final long portStatsPeriod = 5000;
-    private static final long tableStatsPeriod = 10000;
-    private static final long tickPeriod = 1000;
-    private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
-    private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
-    private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
-    private static short tableTickNumber = (short) (tableStatsPeriod / tickPeriod);
+    private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
+    private static final int INITIAL_SIZE = 64;
+    private static final long FLOW_STATS_PERIOD = 10000;
+    private static final long DESC_STATS_PERIOD = 60000;
+    private static final long PORT_STATS_PERIOD = 5000;
+    private static final long TABLE_STATS_PERIOD = 10000;
+    private static final long TICK = 1000;
+    private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
+    private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
+    private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
+    private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
     private static short factoredSamples = (short) 2;
     private static short counter = 1;
     private IController controller = null;
@@ -97,18 +95,17 @@ IInventoryShimExternalListener, CommandProvider {
     private Timer statisticsTimer;
     private TimerTask statisticsTimerTask;
     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
-    private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every
-                                                    // portStatsPeriod) transmit
-                                                    // rate
-    private Set<IStatisticsListener> descriptionListeners;
+    // Per port sampled (every portStatsPeriod) transmit rate
+    private Map<Long, Map<Short, TxRates>> txRates;
+    private Set<IOFStatisticsListener> statisticsListeners;
 
     /**
      * The object containing the latest factoredSamples tx rate samples for a
      * given switch port
      */
     protected class TxRates {
-        Deque<Long> sampledTxBytes; // contains the latest factoredSamples
-                                    // sampled transmitted bytes
+        // contains the latest factoredSamples sampled transmitted bytes
+        Deque<Long> sampledTxBytes;
 
         public TxRates() {
             sampledTxBytes = new LinkedBlockingDeque<Long>();
@@ -129,7 +126,7 @@ IInventoryShimExternalListener, CommandProvider {
 
         /**
          * Returns the average transmit rate in bps
-         * 
+         *
          * @return the average transmit rate [bps]
          */
         public long getAverageTxRate() {
@@ -140,10 +137,9 @@ IInventoryShimExternalListener, CommandProvider {
             if (sampledTxBytes.size() < factoredSamples) {
                 return average;
             }
-            long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
-                    .getLast());
-            long timePeriod = (long) (factoredSamples * portStatsPeriod)
-                    / (long) tickPeriod;
+            long increment = sampledTxBytes.getFirst() - sampledTxBytes
+                    .getLast();
+            long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
             average = (8L * increment) / timePeriod;
             return average;
         }
@@ -162,7 +158,7 @@ IInventoryShimExternalListener, CommandProvider {
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
-     * 
+     *
      */
     void init() {
         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
@@ -170,15 +166,12 @@ IInventoryShimExternalListener, CommandProvider {
         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
         dummyList = new ArrayList<OFStatistics>(1);
-        statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
-                initialSize);
-        pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
-                initialSize);
-        switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
-        switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
-                initialSize);
-        txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
-        descriptionListeners = new HashSet<IStatisticsListener>();
+        statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
+        pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(INITIAL_SIZE);
+        switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
+        switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
+        txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
+        statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
 
         configStatsPollIntervals();
 
@@ -198,7 +191,7 @@ IInventoryShimExternalListener, CommandProvider {
                 while (true) {
                     try {
                         StatsRequest req = pendingStatsRequests.take();
-                        acquireStatistics(req.switchId, req.type);
+                        queryStatisticsInternal(req.switchId, req.type);
                     } catch (InterruptedException e) {
                         log.warn("Flow Statistics Collector thread "
                                 + "interrupted", e);
@@ -227,7 +220,7 @@ IInventoryShimExternalListener, CommandProvider {
      * Function called by the dependency manager when at least one dependency
      * become unsatisfied or when the component is shutting down because for
      * example bundle is being stopped.
-     * 
+     *
      */
     void destroy() {
     }
@@ -235,11 +228,11 @@ IInventoryShimExternalListener, CommandProvider {
     /**
      * Function called by dependency manager after "init ()" is called and after
      * the services provided by the class are registered in the service registry
-     * 
+     *
      */
     void start() {
         // Start managed timers
-        statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
+        statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
 
         // Start statistics collector thread
         statisticsCollector.start();
@@ -255,28 +248,26 @@ IInventoryShimExternalListener, CommandProvider {
      * Function called by the dependency manager before the services exported by
      * the component are unregistered, this will be followed by a "destroy ()"
      * calls
-     * 
+     *
      */
     void stop() {
         // Stop managed timers
         statisticsTimer.cancel();
     }
 
-    public void setStatisticsListener(IStatisticsListener s) {
-        this.descriptionListeners.add(s);
+    public void setStatisticsListener(IOFStatisticsListener s) {
+        this.statisticsListeners.add(s);
     }
 
-    public void unsetStatisticsListener(IStatisticsListener s) {
+    public void unsetStatisticsListener(IOFStatisticsListener s) {
         if (s != null) {
-            this.descriptionListeners.remove(s);
+            this.statisticsListeners.remove(s);
         }
     }
 
     private void registerWithOSGIConsole() {
-        BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
-                .getBundleContext();
-        bundleContext.registerService(CommandProvider.class.getName(), this,
-                null);
+        BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
+        bundleContext.registerService(CommandProvider.class.getName(), this, null);
     }
 
     private static class StatsRequest {
@@ -337,7 +328,7 @@ IInventoryShimExternalListener, CommandProvider {
                                                                   // extension
                                                                   // stats
         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
-        log.info("Added Switch {} to target pool",
+        log.debug("Added Switch {} to target pool",
                 HexString.toHexString(switchId.longValue()));
     }
 
@@ -414,11 +405,9 @@ IInventoryShimExternalListener, CommandProvider {
     }
 
     private void printInfoMessage(String type, StatsRequest request) {
-        log.info(
-                "{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
-                new Object[] { type, HexString.toHexString(request.switchId),
-                        pendingStatsRequests.size(),
-                        statisticsCollector.getState().toString() });
+        log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
+                new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
+                statisticsCollector.getState().toString() });
     }
 
     protected void decrementTicks() {
@@ -427,10 +416,10 @@ IInventoryShimExternalListener, CommandProvider {
                 .entrySet()) {
             StatisticsTicks clock = entry.getValue();
             Long switchId = entry.getKey();
-            if (clock.decrementFlowTicksIsZero() == true) {
-                request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
-                        switchId, OFStatisticsType.VENDOR) : new StatsRequest(
-                        switchId, OFStatisticsType.FLOW);
+            if (clock.decrementFlowTicksIsZero()) {
+                request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
+                        new StatsRequest(switchId, OFStatisticsType.VENDOR) :
+                        new StatsRequest(switchId, OFStatisticsType.FLOW);
                 // If a request for this switch is already in the queue, skip to
                 // add this new request
                 if (!pendingStatsRequests.contains(request)
@@ -439,7 +428,7 @@ IInventoryShimExternalListener, CommandProvider {
                 }
             }
 
-            if (clock.decrementDescTicksIsZero() == true) {
+            if (clock.decrementDescTicksIsZero()) {
                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
                 // If a request for this switch is already in the queue, skip to
                 // add this new request
@@ -449,7 +438,7 @@ IInventoryShimExternalListener, CommandProvider {
                 }
             }
 
-            if (clock.decrementPortTicksIsZero() == true) {
+            if (clock.decrementPortTicksIsZero()) {
                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
                 // If a request for this switch is already in the queue, skip to
                 // add this new request
@@ -459,7 +448,7 @@ IInventoryShimExternalListener, CommandProvider {
                 }
             }
 
-            if(clock.decrementTableTicksIsZero() == true) {
+            if(clock.decrementTableTicksIsZero()) {
                 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
                 // If a request for this switch is already in the queue, skip to
                 // add this new request
@@ -472,7 +461,7 @@ IInventoryShimExternalListener, CommandProvider {
     }
 
     private void removeStatsRequestTasks(Long switchId) {
-        log.info("Cleaning Statistics database for switch {}",
+        log.debug("Cleaning Statistics database for switch {}",
                 HexEncode.longToHexString(switchId));
         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
         // does not hurt
@@ -495,54 +484,82 @@ IInventoryShimExternalListener, CommandProvider {
         statisticsTimerTicks.remove(switchId);
         removeStatsRequestTasks(switchId);
         flowStatistics.remove(switchId);
-        log.info("Statistics removed for switch {}",
+        log.debug("Statistics removed for switch {}",
                 HexString.toHexString(switchId));
     }
 
-    private void acquireStatistics(Long switchId, OFStatisticsType statType) {
+    private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
 
         // Query the switch on all matches
-        List<OFStatistics> values = this.acquireStatistics(switchId, statType,
-                null);
+        List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
 
-        // Update local caching database if got a valid response
+        // If got a valid response update local cache and notify listeners
         if (values != null && !values.isEmpty()) {
-            if ((statType == OFStatisticsType.FLOW)
-                    || (statType == OFStatisticsType.VENDOR)) {
-                flowStatistics.put(switchId, values);
-            } else if (statType == OFStatisticsType.DESC) {
-                // Notify who may be interested in a description change
-                notifyDescriptionListeners(switchId, values);
-
-                // Overwrite cache
-                descStatistics.put(switchId, values);
-            } else if (statType == OFStatisticsType.PORT) {
-                // Overwrite cache with new port statistics for this switch
-                portStatistics.put(switchId, values);
-
-                // Wake up the thread which maintains the TX byte counters for
-                // each port
-                switchPortStatsUpdated.offer(switchId);
-            } else if (statType == OFStatisticsType.TABLE) {
-                // Overwrite cache
-                tableStatistics.put(switchId, values);
+            switch (statType) {
+                case FLOW:
+                case VENDOR:
+                    flowStatistics.put(switchId, values);
+                    notifyFlowUpdate(switchId, values);
+                    break;
+                case DESC:
+                    // Overwrite cache
+                    descStatistics.put(switchId, values);
+                    // Notify who may be interested in a description change
+                    notifyDescriptionUpdate(switchId, values);
+                    break;
+                case PORT:
+                    // Overwrite cache with new port statistics for this switch
+                    portStatistics.put(switchId, values);
+
+                    // Wake up the thread which maintains the TX byte counters for
+                    // each port
+                    switchPortStatsUpdated.offer(switchId);
+                    notifyPortUpdate(switchId, values);
+                    break;
+                case TABLE:
+                    // Overwrite cache
+                    tableStatistics.put(switchId, values);
+                    notifyTableUpdate(switchId, values);
+                    break;
+                default:
             }
         }
     }
 
-    private void notifyDescriptionListeners(Long switchId,
-            List<OFStatistics> values) {
-        for (IStatisticsListener l : this.descriptionListeners) {
-            l.descriptionRefreshed(switchId,
-                    ((OFDescriptionStatistics) values.get(0)));
+    private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
+        for (IOFStatisticsListener l : this.statisticsListeners) {
+            l.descriptionStatisticsRefreshed(switchId, values);
+        }
+    }
+
+    private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
+        if (values.get(0) instanceof OFVendorStatistics) {
+            values = this.v6StatsListToOFStatsList(values);
+        }
+
+        for (IOFStatisticsListener l : this.statisticsListeners) {
+            l.flowStatisticsRefreshed(switchId, values);
+        }
+
+    }
+
+    private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
+        for (IOFStatisticsListener l : this.statisticsListeners) {
+            l.portStatisticsRefreshed(switchId, values);
+        }
+    }
+
+    private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
+        for (IOFStatisticsListener l : this.statisticsListeners) {
+            l.tableStatisticsRefreshed(switchId, values);
         }
     }
 
     /*
-     * Generic function to get the statistics form a OF switch
+     * Generic function to get the statistics form an OF switch
      */
     @SuppressWarnings("unchecked")
-    private List<OFStatistics> acquireStatistics(Long switchId,
+    private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
             OFStatisticsType statsType, Object target) {
         List<OFStatistics> values = null;
         String type = null;
@@ -599,7 +616,7 @@ IInventoryShimExternalListener, CommandProvider {
                 short targetPort;
                 if (target == null) {
                     // All ports request
-                    targetPort = (short) OFPort.OFPP_NONE.getValue();
+                    targetPort = OFPort.OFPP_NONE.getValue();
                 } else if (!(target instanceof Short)) {
                     // Malformed request
                     log.warn("Invalid target type for Port stats request: {}",
@@ -617,7 +634,7 @@ IInventoryShimExternalListener, CommandProvider {
                 type = "PORT";
             } else if (statsType == OFStatisticsType.QUEUE) {
                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
-                specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
+                specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
                 specificReq.setQueueId(0xffffffff);
                 req.setStatistics(Collections
                         .singletonList((OFStatistics) specificReq));
@@ -680,7 +697,7 @@ IInventoryShimExternalListener, CommandProvider {
     }
 
     @Override
-    public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
+    public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
         List<OFStatistics> statsList = flowStatistics.get(switchId);
 
         /*
@@ -705,7 +722,7 @@ IInventoryShimExternalListener, CommandProvider {
             for (OFStatistics stats : targetList) {
                 V6StatsReply v6Stats = (V6StatsReply) stats;
                 V6Match v6Match = v6Stats.getMatch();
-                if (v6Match.equals(targetMatch)) {
+                if (v6Stats.getPriority() == priority && v6Match.equals(targetMatch)) {
                     List<OFStatistics> list = new ArrayList<OFStatistics>();
                     list.add(stats);
                     return list;
@@ -714,7 +731,7 @@ IInventoryShimExternalListener, CommandProvider {
         } else {
             for (OFStatistics stats : statsList) {
                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
-                if (flowStats.getMatch().equals(ofMatch)) {
+                if (flowStats.getPriority() == priority && flowStats.getMatch().equals(ofMatch)) {
                     List<OFStatistics> list = new ArrayList<OFStatistics>();
                     list.add(stats);
                     return list;
@@ -800,12 +817,11 @@ IInventoryShimExternalListener, CommandProvider {
             }
         }
 
-        List<OFStatistics> list = this.acquireStatistics(switchId, statType,
+        List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType,
                 target);
 
-        return (list == null) ? null
-                : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
-                        : list;
+        return (list == null) ? null :
+            (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
     }
 
     @Override
@@ -896,12 +912,11 @@ IInventoryShimExternalListener, CommandProvider {
     /**
      * Update the cached port rates for this switch with the latest retrieved
      * port transmit byte count
-     * 
+     *
      * @param switchId
      */
     private synchronized void updatePortsTxRate(long switchId) {
-        List<OFStatistics> newPortStatistics = this.portStatistics
-                .get(switchId);
+        List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
         if (newPortStatistics == null) {
             return;
         }
@@ -954,7 +969,7 @@ IInventoryShimExternalListener, CommandProvider {
         help.append("---OF Statistics Manager utilities---\n");
         help.append("\t ofdumpstatsmgr         - "
                 + "Print Internal Stats Mgr db\n");
-        help.append("\t ofstatsmgrintervals <fP> <pP> <dP>(in seconds) - "
+        help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
                 + "Set/Show flow/port/dedscription stats poll intervals\n");
         return help.toString();
     }
@@ -1087,9 +1102,9 @@ IInventoryShimExternalListener, CommandProvider {
 
         if (flowStatsInterv == null || portStatsInterv == null
                 || descStatsInterv == null) {
-            ci.println("Usage: ostatsmgrintervals <fP> <pP> <dP>(in seconds)");
-            ci.println("Current Values: fP=" + statisticsTickNumber + "s pP="
-                    + portTickNumber + "s dP=" + descriptionTickNumber + "s");
+            ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
+            ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
+                    + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
             return;
         }
         Short fP, pP, dP, tP;