StatisticsManager "started" method may fail
[controller.git] / opendaylight / statisticsmanager / implementation / src / main / java / org / opendaylight / controller / statisticsmanager / internal / StatisticsManager.java
index f5f56fc70f887f3002c9f84a888d61a4e75476bc..57dfa91b964956e68336673debbca76288d5a00d 100644 (file)
 package org.opendaylight.controller.statisticsmanager.internal;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
+import org.opendaylight.controller.clustering.services.CacheConfigException;
+import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
+import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
+import org.opendaylight.controller.sal.core.IContainer;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
+import org.opendaylight.controller.sal.core.NodeTable;
+import org.opendaylight.controller.sal.core.Property;
+import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.flowprogrammer.Flow;
+import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
 import org.opendaylight.controller.sal.reader.FlowOnNode;
 import org.opendaylight.controller.sal.reader.IReadService;
+import org.opendaylight.controller.sal.reader.IReadServiceListener;
 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
 import org.opendaylight.controller.sal.reader.NodeDescription;
+import org.opendaylight.controller.sal.reader.NodeTableStatistics;
+import org.opendaylight.controller.sal.utils.ServiceHelper;
 import org.opendaylight.controller.statisticsmanager.IStatisticsManager;
+import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The class which implements the methods for retrieving
- * the network nodes statistics.
+ * The class caches latest network nodes statistics as notified by reader
+ * services and provides API to retrieve them.
  */
-public class StatisticsManager implements IStatisticsManager {
-    private static final Logger log = LoggerFactory
-            .getLogger(StatisticsManager.class);
+public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates {
+    private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class);
+    private IContainer container;
+    private IClusterContainerServices clusterContainerService;
     private IReadService reader;
+    //statistics caches
+    private ConcurrentMap<Node, List<FlowOnNode>> flowStatistics;
+    private ConcurrentMap<Node, List<NodeConnectorStatistics>> nodeConnectorStatistics;
+    private ConcurrentMap<Node, List<NodeTableStatistics>> tableStatistics;
+    private ConcurrentMap<Node, NodeDescription> descriptionStatistics;
 
-    public StatisticsManager() {
+    private void nonClusterObjectCreate() {
+        flowStatistics = new ConcurrentHashMap<Node, List<FlowOnNode>>();
+        nodeConnectorStatistics = new ConcurrentHashMap<Node, List<NodeConnectorStatistics>>();
+        tableStatistics = new ConcurrentHashMap<Node, List<NodeTableStatistics>>();
+        descriptionStatistics = new ConcurrentHashMap<Node, NodeDescription>();
+    }
+
+    @SuppressWarnings("deprecation")
+    private void allocateCaches() {
+        if (clusterContainerService == null) {
+            nonClusterObjectCreate();
+            log.error("Clustering service unavailable. Allocated non-cluster statistics manager cache.");
+            return;
+        }
+
+        try {
+            clusterContainerService.createCache("statisticsmanager.flowStatistics",
+                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics",
+                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            clusterContainerService.createCache("statisticsmanager.tableStatistics",
+                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            clusterContainerService.createCache("statisticsmanager.descriptionStatistics",
+                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+
+        } catch (CacheConfigException cce) {
+            log.error("Statistics cache configuration invalid - check cache mode");
+        } catch (CacheExistException ce) {
+            log.debug("Skipping statistics cache creation - already present");
+        }
+    }
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private void retrieveCaches() {
+        ConcurrentMap<?, ?> map;
+
+        if (this.clusterContainerService == null) {
+            log.warn("Can't retrieve statistics manager cache, Clustering service unavailable.");
+            return;
+        }
+
+        log.debug("Statistics Manager - retrieveCaches for Container {}", container);
+
+        map = clusterContainerService.getCache("statisticsmanager.flowStatistics");
+        if (map != null) {
+            this.flowStatistics = (ConcurrentMap<Node, List<FlowOnNode>>) map;
+        } else {
+            log.error("Cache allocation failed for statisticsmanager.flowStatistics in container {}", container.getName());
+        }
+
+        map = clusterContainerService.getCache("statisticsmanager.nodeConnectorStatistics");
+        if (map != null) {
+            this.nodeConnectorStatistics = (ConcurrentMap<Node, List<NodeConnectorStatistics>>) map;
+        } else {
+            log.error("Cache allocation failed for statisticsmanager.nodeConnectorStatistics in container {}", container.getName());
+        }
 
+        map = clusterContainerService.getCache("statisticsmanager.tableStatistics");
+        if (map != null) {
+            this.tableStatistics = (ConcurrentMap<Node, List<NodeTableStatistics>>) map;
+        } else {
+            log.error("Cache allocation failed for statisticsmanager.tableStatistics in container {}", container.getName());
+        }
+
+        map = clusterContainerService.getCache("statisticsmanager.descriptionStatistics");
+        if (map != null) {
+            this.descriptionStatistics = (ConcurrentMap<Node, NodeDescription>) map;
+        } else {
+            log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName());
+        }
     }
 
     /**
@@ -46,6 +138,9 @@ public class StatisticsManager implements IStatisticsManager {
      */
     void init() {
         log.debug("INIT called!");
+        allocateCaches();
+        retrieveCaches();
+
     }
 
     /**
@@ -68,6 +163,39 @@ public class StatisticsManager implements IStatisticsManager {
         log.debug("START called!");
     }
 
+    /**
+     * Function called after registering the service in OSGi service registry.
+     */
+    void started(){
+        // Retrieve current statistics so we don't have to wait for next refresh
+        ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(
+                ISwitchManager.class, container.getName(), this);
+        if ((reader != null) && (switchManager != null)) {
+            Set<Node> nodeSet = switchManager.getNodes();
+            for (Node node : nodeSet) {
+                List<FlowOnNode> flows = reader.readAllFlows(node);
+                if (flows != null) {
+                    flowStatistics.put(node, flows);
+                }
+                NodeDescription descr = reader.readDescription(node);
+                if (descr != null) {
+                    descriptionStatistics.put(node, descr);
+                }
+                List<NodeTableStatistics> tableStats = reader.readNodeTable(node);
+                if (tableStats != null) {
+                    tableStatistics.put(node, tableStats);
+                }
+                List<NodeConnectorStatistics> ncStats = reader.readNodeConnectors(node);
+                if (ncStats != null) {
+                    nodeConnectorStatistics.put(node, ncStats);
+                }
+            }
+
+        } else {
+            log.trace("Failed to retrieve current statistics. Statistics will not be immediately available!");
+        }
+    }
+
     /**
      * Function called by the dependency manager before the services
      * exported by the component are unregistered, this will be
@@ -78,56 +206,212 @@ public class StatisticsManager implements IStatisticsManager {
         log.debug("STOP called!");
     }
 
+    void setClusterContainerService(IClusterContainerServices s) {
+        log.debug("Cluster Service set for Statistics Mgr");
+        this.clusterContainerService = s;
+    }
+
+    void unsetClusterContainerService(IClusterContainerServices s) {
+        if (this.clusterContainerService == s) {
+            log.debug("Cluster Service removed for Statistics Mgr!");
+            this.clusterContainerService = null;
+        }
+    }
+    void setIContainer(IContainer c){
+        container = c;
+    }
+    public void unsetIContainer(IContainer s) {
+        if (this.container == s) {
+            this.container = null;
+        }
+    }
+
     public void setReaderService(IReadService service) {
         log.debug("Got inventory service set request {}", service);
         this.reader = service;
     }
 
     public void unsetReaderService(IReadService service) {
-        log.debug("Got a service UNset request");
+        log.debug("Got a service UNset request {}", service);
         this.reader = null;
     }
 
     @Override
     public List<FlowOnNode> getFlows(Node node) {
-        return reader.readAllFlows(node);
+        if (node == null) {
+            return null;
+        }
+
+        List<FlowOnNode> flowList = new ArrayList<FlowOnNode>();
+        List<FlowOnNode> cachedList = flowStatistics.get(node);
+        if (cachedList != null){
+            flowList.addAll(cachedList);
+        }
+        return flowList;
     }
 
     @Override
-    public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(
-            List<FlowEntry> flowList) {
-        Map<Node, List<FlowOnNode>> map = new HashMap<Node, List<FlowOnNode>>();
-        if (flowList != null) {
-            for (FlowEntry entry : flowList) {
-                Node node = entry.getNode();
-                Flow flow = entry.getFlow();
-                List<FlowOnNode> list = (map.containsKey(node)) ? map.get(node)
-                        : new ArrayList<FlowOnNode>();
-                list.add(reader.readFlow(node, flow));
-                map.put(node, list);
+    public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(List<FlowEntry> flowList) {
+        Map<Node, List<FlowOnNode>> statMapOutput = new HashMap<Node, List<FlowOnNode>>();
+
+        if (flowList == null || flowList.isEmpty()){
+            return statMapOutput;
+        }
+
+        Node node;
+        // Index FlowEntries' flows by node so we don't traverse entire flow list for each flowEntry
+        Map<Node, Set<Flow>> index = new HashMap<Node, Set<Flow>>();
+        for (FlowEntry flowEntry : flowList) {
+            node = flowEntry.getNode();
+            Set<Flow> set = (index.containsKey(node) ? index.get(node) : new HashSet<Flow>());
+            set.add(flowEntry.getFlow());
+            index.put(node, set);
+        }
+
+        // Iterate over flows per indexed node and add to output
+        for (Entry<Node, Set<Flow>> indexEntry : index.entrySet()) {
+            node = indexEntry.getKey();
+            List<FlowOnNode> flowsPerNode = flowStatistics.get(node);
+
+            if (flowsPerNode != null && !flowsPerNode.isEmpty()){
+                List<FlowOnNode> filteredFlows = statMapOutput.containsKey(node) ?
+                        statMapOutput.get(node) : new ArrayList<FlowOnNode>();
+
+                for (FlowOnNode flowOnNode : flowsPerNode) {
+                    if (indexEntry.getValue().contains(flowOnNode.getFlow())) {
+                        filteredFlows.add(flowOnNode);
+                    }
+                }
+                statMapOutput.put(node, filteredFlows);
             }
         }
-        return map;
+        return statMapOutput;
     }
 
     @Override
     public int getFlowsNumber(Node node) {
-        return reader.readAllFlows(node).size();
+        List<FlowOnNode> l;
+        if (node == null || (l = flowStatistics.get(node)) == null){
+            return -1;
+        }
+        return l.size();
     }
 
     @Override
     public NodeDescription getNodeDescription(Node node) {
-        return reader.readDescription(node);
+        if (node == null){
+            return null;
+        }
+        NodeDescription nd = descriptionStatistics.get(node);
+        return nd != null? nd.clone() : null;
     }
 
     @Override
-    public NodeConnectorStatistics getNodeConnectorStatistics(
-            NodeConnector nodeConnector) {
-        return reader.readNodeConnector(nodeConnector);
+    public NodeConnectorStatistics getNodeConnectorStatistics(NodeConnector nodeConnector) {
+        if (nodeConnector == null){
+            return null;
+        }
+
+        List<NodeConnectorStatistics> statList = nodeConnectorStatistics.get(nodeConnector.getNode());
+        if (statList != null){
+            for (NodeConnectorStatistics stat : statList) {
+                if (stat.getNodeConnector().equals(nodeConnector)){
+                    return stat;
+                }
+            }
+        }
+        return null;
     }
 
     @Override
     public List<NodeConnectorStatistics> getNodeConnectorStatistics(Node node) {
-        return reader.readNodeConnectors(node);
+        if (node == null){
+            return null;
+        }
+
+        List<NodeConnectorStatistics> statList = new ArrayList<NodeConnectorStatistics>();
+        List<NodeConnectorStatistics> cachedList = nodeConnectorStatistics.get(node);
+        if (cachedList != null) {
+            statList.addAll(cachedList);
+        }
+        return statList;
+    }
+
+    @Override
+    public NodeTableStatistics getNodeTableStatistics(NodeTable nodeTable) {
+        if (nodeTable == null){
+            return null;
+        }
+        List<NodeTableStatistics> statList = tableStatistics.get(nodeTable.getNode());
+        if (statList != null){
+            for (NodeTableStatistics stat : statList) {
+                if (stat.getNodeTable().getID().equals(nodeTable.getID())){
+                    return stat;
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public List<NodeTableStatistics> getNodeTableStatistics(Node node){
+        if (node == null){
+            return null;
+        }
+        List<NodeTableStatistics> statList = new ArrayList<NodeTableStatistics>();
+        List<NodeTableStatistics> cachedList = tableStatistics.get(node);
+        if (cachedList != null) {
+            statList.addAll(cachedList);
+        }
+        return statList;
+    }
+
+    @Override
+    public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
+        List<FlowOnNode> currentStat = this.flowStatistics.get(node);
+        // Update cache only if changed to avoid unnecessary cache sync operations
+        if (! flowStatsList.equals(currentStat)){
+            this.flowStatistics.put(node, flowStatsList);
+        }
+    }
+
+    @Override
+    public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
+        List<NodeConnectorStatistics> currentStat = this.nodeConnectorStatistics.get(node);
+        if (! ncStatsList.equals(currentStat)){
+            this.nodeConnectorStatistics.put(node, ncStatsList);
+        }
+    }
+
+    @Override
+    public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
+        List<NodeTableStatistics> currentStat = this.tableStatistics.get(node);
+        if (! tableStatsList.equals(currentStat)) {
+            this.tableStatistics.put(node, tableStatsList);
+        }
+    }
+
+    @Override
+    public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
+        NodeDescription currentDesc = this.descriptionStatistics.get(node);
+        if (! nodeDescription.equals(currentDesc)){
+            this.descriptionStatistics.put(node, nodeDescription);
+        }
+    }
+
+    @Override
+    public void updateNode(Node node, UpdateType type, Set<Property> props) {
+        // If node is removed, clean up stats mappings
+        if (type == UpdateType.REMOVED) {
+            flowStatistics.remove(node);
+            nodeConnectorStatistics.remove(node);
+            tableStatistics.remove(node);
+            descriptionStatistics.remove(node);
+        }
+    }
+
+    @Override
+    public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
+        // Not interested in this update
     }
 }