X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fstatisticsmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fstatisticsmanager%2Finternal%2FStatisticsManager.java;h=f5c13b61053ee5eaaa7eab5594608cd3da4ce5d7;hp=f5f56fc70f887f3002c9f84a888d61a4e75476bc;hb=eed57e2b0afd50823bc882123b6cbac04bcc48d9;hpb=11837100975c9ad113e12668c09ecd78f9433f73 diff --git a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java index f5f56fc70f..f5c13b6105 100644 --- a/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java +++ b/opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java @@ -10,33 +10,125 @@ package org.opendaylight.controller.statisticsmanager.internal; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.opendaylight.controller.clustering.services.CacheConfigException; +import org.opendaylight.controller.clustering.services.CacheExistException; +import org.opendaylight.controller.clustering.services.IClusterContainerServices; +import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.forwardingrulesmanager.FlowEntry; +import org.opendaylight.controller.sal.core.IContainer; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; +import org.opendaylight.controller.sal.core.NodeTable; +import org.opendaylight.controller.sal.core.Property; +import org.opendaylight.controller.sal.core.UpdateType; import org.opendaylight.controller.sal.flowprogrammer.Flow; +import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates; import org.opendaylight.controller.sal.reader.FlowOnNode; import org.opendaylight.controller.sal.reader.IReadService; +import org.opendaylight.controller.sal.reader.IReadServiceListener; import org.opendaylight.controller.sal.reader.NodeConnectorStatistics; import org.opendaylight.controller.sal.reader.NodeDescription; +import org.opendaylight.controller.sal.reader.NodeTableStatistics; +import org.opendaylight.controller.sal.utils.ServiceHelper; import org.opendaylight.controller.statisticsmanager.IStatisticsManager; +import org.opendaylight.controller.switchmanager.ISwitchManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The class which implements the methods for retrieving - * the network nodes statistics. + * The class caches latest network nodes statistics as notified by reader + * services and provides API to retrieve them. */ -public class StatisticsManager implements IStatisticsManager { - private static final Logger log = LoggerFactory - .getLogger(StatisticsManager.class); +public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates { + private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class); + private IContainer container; + private IClusterContainerServices clusterContainerService; private IReadService reader; + //statistics caches + private ConcurrentMap> flowStatistics; + private ConcurrentMap> nodeConnectorStatistics; + private ConcurrentMap> tableStatistics; + private ConcurrentMap descriptionStatistics; - public StatisticsManager() { + private void nonClusterObjectCreate() { + flowStatistics = new ConcurrentHashMap>(); + nodeConnectorStatistics = new ConcurrentHashMap>(); + tableStatistics = new ConcurrentHashMap>(); + descriptionStatistics = new ConcurrentHashMap(); + } + + @SuppressWarnings("deprecation") + private void allocateCaches() { + if (clusterContainerService == null) { + nonClusterObjectCreate(); + log.error("Clustering service unavailable. Allocated non-cluster statistics manager cache."); + return; + } + try { + clusterContainerService.createCache("statisticsmanager.flowStatistics", + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics", + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + clusterContainerService.createCache("statisticsmanager.tableStatistics", + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + clusterContainerService.createCache("statisticsmanager.descriptionStatistics", + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); + + } catch (CacheConfigException cce) { + log.error("Statistics cache configuration invalid - check cache mode"); + } catch (CacheExistException ce) { + log.debug("Skipping statistics cache creation - already present"); + } + } + @SuppressWarnings({ "unchecked", "deprecation" }) + private void retrieveCaches() { + ConcurrentMap map; + + if (this.clusterContainerService == null) { + log.warn("Can't retrieve statistics manager cache, Clustering service unavailable."); + return; + } + + log.debug("Statistics Manager - retrieveCaches for Container {}", container); + + map = clusterContainerService.getCache("statisticsmanager.flowStatistics"); + if (map != null) { + this.flowStatistics = (ConcurrentMap>) map; + } else { + log.error("Cache allocation failed for statisticsmanager.flowStatistics in container {}", container.getName()); + } + + map = clusterContainerService.getCache("statisticsmanager.nodeConnectorStatistics"); + if (map != null) { + this.nodeConnectorStatistics = (ConcurrentMap>) map; + } else { + log.error("Cache allocation failed for statisticsmanager.nodeConnectorStatistics in container {}", container.getName()); + } + + map = clusterContainerService.getCache("statisticsmanager.tableStatistics"); + if (map != null) { + this.tableStatistics = (ConcurrentMap>) map; + } else { + log.error("Cache allocation failed for statisticsmanager.tableStatistics in container {}", container.getName()); + } + + map = clusterContainerService.getCache("statisticsmanager.descriptionStatistics"); + if (map != null) { + this.descriptionStatistics = (ConcurrentMap) map; + } else { + log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName()); + } } /** @@ -46,6 +138,9 @@ public class StatisticsManager implements IStatisticsManager { */ void init() { log.debug("INIT called!"); + allocateCaches(); + retrieveCaches(); + } /** @@ -68,6 +163,27 @@ public class StatisticsManager implements IStatisticsManager { log.debug("START called!"); } + /** + * Function called after registering the service in OSGi service registry. + */ + void started(){ + //retrieve current statistics so we don't have to wait for next refresh + ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance( + ISwitchManager.class, container.getName(), this); + if (reader != null && switchManager != null) { + Set nodeSet = switchManager.getNodes(); + for (Node node : nodeSet) { + flowStatistics.put(node, reader.readAllFlows(node)); + descriptionStatistics.put(node, reader.readDescription(node)); + tableStatistics.put(node, reader.readNodeTable(node)); + nodeConnectorStatistics.put(node, reader.readNodeConnectors(node)); + } + + } else { + log.warn("Failed to retrieve current statistics. Statistics will not be immidiately available!"); + } + } + /** * Function called by the dependency manager before the services * exported by the component are unregistered, this will be @@ -78,56 +194,199 @@ public class StatisticsManager implements IStatisticsManager { log.debug("STOP called!"); } + void setClusterContainerService(IClusterContainerServices s) { + log.debug("Cluster Service set for Statistics Mgr"); + this.clusterContainerService = s; + } + + void unsetClusterContainerService(IClusterContainerServices s) { + if (this.clusterContainerService == s) { + log.debug("Cluster Service removed for Statistics Mgr!"); + this.clusterContainerService = null; + } + } + void setIContainer(IContainer c){ + container = c; + } + public void unsetIContainer(IContainer s) { + if (this.container == s) { + this.container = null; + } + } + public void setReaderService(IReadService service) { log.debug("Got inventory service set request {}", service); this.reader = service; } public void unsetReaderService(IReadService service) { - log.debug("Got a service UNset request"); + log.debug("Got a service UNset request {}", service); this.reader = null; } @Override public List getFlows(Node node) { - return reader.readAllFlows(node); + if (node == null) { + return null; + } + + List flowList = new ArrayList(); + List cachedList = flowStatistics.get(node); + if (cachedList != null){ + flowList.addAll(cachedList); + } + return flowList; } @Override - public Map> getFlowStatisticsForFlowList( - List flowList) { - Map> map = new HashMap>(); - if (flowList != null) { - for (FlowEntry entry : flowList) { - Node node = entry.getNode(); - Flow flow = entry.getFlow(); - List list = (map.containsKey(node)) ? map.get(node) - : new ArrayList(); - list.add(reader.readFlow(node, flow)); - map.put(node, list); + public Map> getFlowStatisticsForFlowList(List flowList) { + Map> statMapOutput = new HashMap>(); + + if (flowList == null || flowList.isEmpty()){ + return statMapOutput; + } + + Node node; + //index FlowEntries' flows by node so we don't traverse entire flow list for each flowEntry + Map> index = new HashMap>(); + for (FlowEntry flowEntry : flowList) { + node = flowEntry.getNode(); + Set set = (index.containsKey(node) ? index.get(node) : new HashSet()); + set.add(flowEntry.getFlow()); + index.put(node, set); + } + + //iterate over flows per indexed node and add to output + for (Entry> indexEntry : index.entrySet()) { + node = indexEntry.getKey(); + List flowsPerNode = flowStatistics.get(node); + + if (flowsPerNode != null && !flowsPerNode.isEmpty()){ + List filteredFlows = statMapOutput.containsKey(node) ? + statMapOutput.get(node) : new ArrayList(); + + for (FlowOnNode flowOnNode : flowsPerNode) { + if (indexEntry.getValue().contains(flowOnNode.getFlow())) { + filteredFlows.add(flowOnNode); + } + } + statMapOutput.put(node, filteredFlows); } } - return map; + return statMapOutput; } @Override public int getFlowsNumber(Node node) { - return reader.readAllFlows(node).size(); + List l; + if (node == null || (l = flowStatistics.get(node)) == null){ + return -1; + } + return l.size(); } @Override public NodeDescription getNodeDescription(Node node) { - return reader.readDescription(node); + if (node == null){ + return null; + } + NodeDescription nd = descriptionStatistics.get(node); + return nd != null? nd.clone() : null; } @Override - public NodeConnectorStatistics getNodeConnectorStatistics( - NodeConnector nodeConnector) { - return reader.readNodeConnector(nodeConnector); + public NodeConnectorStatistics getNodeConnectorStatistics(NodeConnector nodeConnector) { + if (nodeConnector == null){ + return null; + } + + List statList = nodeConnectorStatistics.get(nodeConnector.getNode()); + if (statList != null){ + for (NodeConnectorStatistics stat : statList) { + if (stat.getNodeConnector().equals(nodeConnector)){ + return stat; + } + } + } + return null; } @Override public List getNodeConnectorStatistics(Node node) { - return reader.readNodeConnectors(node); + if (node == null){ + return null; + } + + List statList = new ArrayList(); + List cachedList = nodeConnectorStatistics.get(node); + if (cachedList != null) { + statList.addAll(cachedList); + } + return statList; + } + + @Override + public NodeTableStatistics getNodeTableStatistics(NodeTable nodeTable) { + if (nodeTable == null){ + return null; + } + List statList = tableStatistics.get(nodeTable.getNode()); + if (statList != null){ + for (NodeTableStatistics stat : statList) { + if (stat.getNodeTable().getID().equals(nodeTable.getID())){ + return stat; + } + } + } + return null; + } + + @Override + public List getNodeTableStatistics(Node node){ + if (node == null){ + return null; + } + List statList = new ArrayList(); + List cachedList = tableStatistics.get(node); + if (cachedList != null) { + statList.addAll(cachedList); + } + return statList; + } + + @Override + public void nodeFlowStatisticsUpdated(Node node, List flowStatsList) { + this.flowStatistics.put(node, flowStatsList); + } + + @Override + public void nodeConnectorStatisticsUpdated(Node node, List ncStatsList) { + this.nodeConnectorStatistics.put(node, ncStatsList); + } + + @Override + public void nodeTableStatisticsUpdated(Node node, List tableStatsList) { + this.tableStatistics.put(node, tableStatsList); + } + + @Override + public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) { + this.descriptionStatistics.put(node, nodeDescription); + } + + @Override + public void updateNode(Node node, UpdateType type, Set props) { + //if node is removed, remove stats mappings + if (type == UpdateType.REMOVED) { + flowStatistics.remove(node); + nodeConnectorStatistics.remove(node); + tableStatistics.remove(node); + descriptionStatistics.remove(node); + } + } + + @Override + public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set props) { + // not interested in this update } }