3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.statisticsmanager.internal;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.EnumSet;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
19 import java.util.Map.Entry;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
29 import org.opendaylight.controller.clustering.services.CacheConfigException;
30 import org.opendaylight.controller.clustering.services.CacheExistException;
31 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
32 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
33 import org.opendaylight.controller.clustering.services.IClusterServices;
34 import org.opendaylight.controller.connectionmanager.IConnectionManager;
35 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
36 import org.opendaylight.controller.sal.connection.ConnectionLocality;
37 import org.opendaylight.controller.sal.core.IContainer;
38 import org.opendaylight.controller.sal.core.Node;
39 import org.opendaylight.controller.sal.core.NodeConnector;
40 import org.opendaylight.controller.sal.core.NodeTable;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.flowprogrammer.Flow;
44 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
45 import org.opendaylight.controller.sal.reader.FlowOnNode;
46 import org.opendaylight.controller.sal.reader.IReadService;
47 import org.opendaylight.controller.sal.reader.IReadServiceListener;
48 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
49 import org.opendaylight.controller.sal.reader.NodeDescription;
50 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
51 import org.opendaylight.controller.sal.utils.ServiceHelper;
52 import org.opendaylight.controller.statisticsmanager.IStatisticsManager;
53 import org.opendaylight.controller.switchmanager.ISwitchManager;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * The class caches latest network nodes statistics as notified by reader
59 * services and provides API to retrieve them.
61 public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates,
62 ICacheUpdateAware<Object,Object> {
63 private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class);
64 private IContainer container;
65 private IClusterContainerServices clusterContainerService;
66 private IReadService reader;
67 private IConnectionManager connectionManager;
69 private ConcurrentMap<Node, List<FlowOnNode>> flowStatistics;
70 private ConcurrentMap<Node, List<NodeConnectorStatistics>> nodeConnectorStatistics;
71 private ConcurrentMap<Node, List<NodeTableStatistics>> tableStatistics;
72 private ConcurrentMap<Node, NodeDescription> descriptionStatistics;
74 // data structure for latches
75 // this is not a cluster cache
76 private ConcurrentMap<Node, CountDownLatch> latches = new ConcurrentHashMap<Node, CountDownLatch>();
77 // 30 seconds is the timeout.
78 // the value of this can be tweaked based on performance tests.
79 private static long latchTimeout = 30;
81 // cache for flow stats refresh triggers
82 // an entry added to this map triggers the statistics manager
83 // to which the node is connected to get the latest flow stats from that node
84 // this is a cluster cache
85 private ConcurrentMap<Integer, Node> triggers;
87 // use an atomic integer for the triggers key
88 private AtomicInteger triggerKey = new AtomicInteger();
90 // single thread executor for the triggers
91 private ExecutorService triggerExecutor;
93 static final String TRIGGERS_CACHE = "statisticsmanager.triggers";
94 static final String FLOW_STATISTICS_CACHE = "statisticsmanager.flowStatistics";
96 private void nonClusterObjectCreate() {
97 flowStatistics = new ConcurrentHashMap<Node, List<FlowOnNode>>();
98 nodeConnectorStatistics = new ConcurrentHashMap<Node, List<NodeConnectorStatistics>>();
99 tableStatistics = new ConcurrentHashMap<Node, List<NodeTableStatistics>>();
100 descriptionStatistics = new ConcurrentHashMap<Node, NodeDescription>();
101 triggers = new ConcurrentHashMap<Integer, Node>();
104 private void allocateCaches() {
105 if (clusterContainerService == null) {
106 nonClusterObjectCreate();
107 log.error("Clustering service unavailable. Allocated non-cluster statistics manager cache.");
112 clusterContainerService.createCache(FLOW_STATISTICS_CACHE,
113 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
114 clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics",
115 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
116 clusterContainerService.createCache("statisticsmanager.tableStatistics",
117 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
118 clusterContainerService.createCache("statisticsmanager.descriptionStatistics",
119 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
120 clusterContainerService.createCache(TRIGGERS_CACHE,
121 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
122 } catch (CacheConfigException cce) {
123 log.error("Statistics cache configuration invalid - check cache mode");
124 } catch (CacheExistException ce) {
125 log.debug("Skipping statistics cache creation - already present");
128 @SuppressWarnings({ "unchecked" })
129 private void retrieveCaches() {
130 ConcurrentMap<?, ?> map;
132 if (this.clusterContainerService == null) {
133 log.warn("Can't retrieve statistics manager cache, Clustering service unavailable.");
137 log.debug("Statistics Manager - retrieveCaches for Container {}", container);
139 map = clusterContainerService.getCache(FLOW_STATISTICS_CACHE);
141 this.flowStatistics = (ConcurrentMap<Node, List<FlowOnNode>>) map;
143 log.error("Cache allocation failed for statisticsmanager.flowStatistics in container {}", container.getName());
146 map = clusterContainerService.getCache("statisticsmanager.nodeConnectorStatistics");
148 this.nodeConnectorStatistics = (ConcurrentMap<Node, List<NodeConnectorStatistics>>) map;
150 log.error("Cache allocation failed for statisticsmanager.nodeConnectorStatistics in container {}", container.getName());
153 map = clusterContainerService.getCache("statisticsmanager.tableStatistics");
155 this.tableStatistics = (ConcurrentMap<Node, List<NodeTableStatistics>>) map;
157 log.error("Cache allocation failed for statisticsmanager.tableStatistics in container {}", container.getName());
160 map = clusterContainerService.getCache("statisticsmanager.descriptionStatistics");
162 this.descriptionStatistics = (ConcurrentMap<Node, NodeDescription>) map;
164 log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName());
167 map = clusterContainerService.getCache(TRIGGERS_CACHE);
169 this.triggers = (ConcurrentMap<Integer, Node>) map;
171 log.error("Cache allocation failed for " + TRIGGERS_CACHE +" in container {}", container.getName());
176 * Function called by the dependency manager when all the required
177 * dependencies are satisfied
181 log.debug("INIT called!");
188 * Function called by the dependency manager when at least one
189 * dependency become unsatisfied or when the component is shutting
190 * down because for example bundle is being stopped.
194 log.debug("DESTROY called!");
198 * Function called by dependency manager after "init ()" is called
199 * and after the services provided by the class are registered in
200 * the service registry
204 log.debug("START called!");
205 this.triggerExecutor = Executors.newSingleThreadExecutor();
209 * Function called after registering the service in OSGi service registry.
212 // Retrieve current statistics so we don't have to wait for next refresh
213 ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(
214 ISwitchManager.class, container.getName(), this);
215 if ((reader != null) && (switchManager != null)) {
216 Set<Node> nodeSet = switchManager.getNodes();
217 for (Node node : nodeSet) {
218 List<FlowOnNode> flows = reader.readAllFlows(node);
220 flowStatistics.put(node, flows);
222 NodeDescription descr = reader.readDescription(node);
224 descriptionStatistics.put(node, descr);
226 List<NodeTableStatistics> tableStats = reader.readNodeTable(node);
227 if (tableStats != null) {
228 tableStatistics.put(node, tableStats);
230 List<NodeConnectorStatistics> ncStats = reader.readNodeConnectors(node);
231 if (ncStats != null) {
232 nodeConnectorStatistics.put(node, ncStats);
237 log.trace("Failed to retrieve current statistics. Statistics will not be immediately available!");
242 * Function called by the dependency manager before the services
243 * exported by the component are unregistered, this will be
244 * followed by a "destroy ()" calls
248 log.debug("STOP called!");
249 this.triggerExecutor.shutdownNow();
252 void setClusterContainerService(IClusterContainerServices s) {
253 log.debug("Cluster Service set for Statistics Mgr");
254 this.clusterContainerService = s;
257 void unsetClusterContainerService(IClusterContainerServices s) {
258 if (this.clusterContainerService == s) {
259 log.debug("Cluster Service removed for Statistics Mgr!");
260 this.clusterContainerService = null;
263 void setIContainer(IContainer c){
266 public void unsetIContainer(IContainer s) {
267 if (this.container == s) {
268 this.container = null;
272 public void setReaderService(IReadService service) {
273 log.debug("Got inventory service set request {}", service);
274 this.reader = service;
277 public void unsetReaderService(IReadService service) {
278 log.debug("Got a service UNset request {}", service);
283 public List<FlowOnNode> getFlows(Node node) {
285 return Collections.emptyList();
288 List<FlowOnNode> flowList = new ArrayList<FlowOnNode>();
289 List<FlowOnNode> cachedList = flowStatistics.get(node);
290 if (cachedList != null){
291 flowList.addAll(cachedList);
300 public List<FlowOnNode> getFlowsNoCache(Node node) {
302 return Collections.emptyList();
304 // check if the node is local to this controller
305 ConnectionLocality locality = ConnectionLocality.LOCAL;
306 if(this.connectionManager != null) {
307 locality = this.connectionManager.getLocalityStatus(node);
309 if (locality == ConnectionLocality.NOT_LOCAL) {
310 // send a trigger to all and wait for either a response or timeout
311 CountDownLatch newLatch = new CountDownLatch(1);
312 CountDownLatch oldLatch = this.latches.putIfAbsent(node, newLatch);
313 this.triggers.put(this.triggerKey.incrementAndGet(), node);
316 if(oldLatch != null) {
317 retStatus = oldLatch.await(this.latchTimeout, TimeUnit.SECONDS);
319 retStatus = newLatch.await(this.latchTimeout, TimeUnit.SECONDS);
321 // log the return code as it will give us, if
322 // the latch timed out.
323 log.debug("latch timed out {}", !retStatus);
324 } catch (InterruptedException e) {
325 // log the error and move on
326 log.warn("Waiting for statistics response interrupted", e);
327 // restore the interrupt status
328 // its a good practice to restore the interrupt status
329 // if you are not propagating the InterruptedException
330 Thread.currentThread().interrupt();
332 // now that the wait is over
333 // remove the latch entry
334 this.latches.remove(node);
336 // the node is local.
337 // call the read service
338 if (this.reader != null) {
339 List<FlowOnNode> flows = reader.nonCachedReadAllFlows(node);
341 nodeFlowStatisticsUpdated(node, flows);
345 // at this point we are ready to return the cached value.
346 // this cached value will be up to date with a very high probability
347 // due to what we have done previously ie:- send a trigger for cache update
348 // or refreshed the cache if the node is local.
349 return getFlows(node);
353 public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(List<FlowEntry> flowList) {
354 Map<Node, List<FlowOnNode>> statMapOutput = new HashMap<Node, List<FlowOnNode>>();
356 if (flowList == null || flowList.isEmpty()){
357 return statMapOutput;
361 // Index FlowEntries' flows by node so we don't traverse entire flow list for each flowEntry
362 Map<Node, Set<Flow>> index = new HashMap<Node, Set<Flow>>();
363 for (FlowEntry flowEntry : flowList) {
364 node = flowEntry.getNode();
365 Set<Flow> set = (index.containsKey(node) ? index.get(node) : new HashSet<Flow>());
366 set.add(flowEntry.getFlow());
367 index.put(node, set);
370 // Iterate over flows per indexed node and add to output
371 for (Entry<Node, Set<Flow>> indexEntry : index.entrySet()) {
372 node = indexEntry.getKey();
373 List<FlowOnNode> flowsPerNode = flowStatistics.get(node);
375 if (flowsPerNode != null && !flowsPerNode.isEmpty()){
376 List<FlowOnNode> filteredFlows = statMapOutput.containsKey(node) ?
377 statMapOutput.get(node) : new ArrayList<FlowOnNode>();
379 for (FlowOnNode flowOnNode : flowsPerNode) {
380 if (indexEntry.getValue().contains(flowOnNode.getFlow())) {
381 filteredFlows.add(flowOnNode);
384 statMapOutput.put(node, filteredFlows);
387 return statMapOutput;
391 public int getFlowsNumber(Node node) {
393 if (node == null || (l = flowStatistics.get(node)) == null){
400 public NodeDescription getNodeDescription(Node node) {
404 NodeDescription nd = descriptionStatistics.get(node);
405 return nd != null? nd.clone() : null;
409 public NodeConnectorStatistics getNodeConnectorStatistics(NodeConnector nodeConnector) {
410 if (nodeConnector == null){
414 List<NodeConnectorStatistics> statList = nodeConnectorStatistics.get(nodeConnector.getNode());
415 if (statList != null){
416 for (NodeConnectorStatistics stat : statList) {
417 if (stat.getNodeConnector().equals(nodeConnector)){
426 public List<NodeConnectorStatistics> getNodeConnectorStatistics(Node node) {
428 return Collections.emptyList();
431 List<NodeConnectorStatistics> statList = new ArrayList<NodeConnectorStatistics>();
432 List<NodeConnectorStatistics> cachedList = nodeConnectorStatistics.get(node);
433 if (cachedList != null) {
434 statList.addAll(cachedList);
440 public NodeTableStatistics getNodeTableStatistics(NodeTable nodeTable) {
441 if (nodeTable == null){
444 List<NodeTableStatistics> statList = tableStatistics.get(nodeTable.getNode());
445 if (statList != null){
446 for (NodeTableStatistics stat : statList) {
447 if (stat.getNodeTable().getID().equals(nodeTable.getID())){
456 public List<NodeTableStatistics> getNodeTableStatistics(Node node){
458 return Collections.emptyList();
460 List<NodeTableStatistics> statList = new ArrayList<NodeTableStatistics>();
461 List<NodeTableStatistics> cachedList = tableStatistics.get(node);
462 if (cachedList != null) {
463 statList.addAll(cachedList);
469 public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
470 // No equality check because duration fields change constantly
471 this.flowStatistics.put(node, flowStatsList);
475 public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
476 List<NodeConnectorStatistics> currentStat = this.nodeConnectorStatistics.get(node);
477 if (! ncStatsList.equals(currentStat)){
478 this.nodeConnectorStatistics.put(node, ncStatsList);
483 public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
484 List<NodeTableStatistics> currentStat = this.tableStatistics.get(node);
485 if (! tableStatsList.equals(currentStat)) {
486 this.tableStatistics.put(node, tableStatsList);
491 public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
492 NodeDescription currentDesc = this.descriptionStatistics.get(node);
493 if (! nodeDescription.equals(currentDesc)){
494 this.descriptionStatistics.put(node, nodeDescription);
499 public void updateNode(Node node, UpdateType type, Set<Property> props) {
500 // If node is removed, clean up stats mappings
501 if (type == UpdateType.REMOVED) {
502 flowStatistics.remove(node);
503 nodeConnectorStatistics.remove(node);
504 tableStatistics.remove(node);
505 descriptionStatistics.remove(node);
510 public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
511 // Not interested in this update
514 public void unsetIConnectionManager(IConnectionManager s) {
515 if (s == this.connectionManager) {
516 this.connectionManager = null;
520 public void setIConnectionManager(IConnectionManager s) {
521 this.connectionManager = s;
525 public void entryCreated(Object key, String cacheName, boolean originLocal) {
532 public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
535 * Local updates are of no interest
539 if (cacheName.equals(TRIGGERS_CACHE)) {
540 log.trace("Got a trigger for key {} : value {}", key, new_value);
541 final Node n = (Node) new_value;
542 // check if the node is local to this controller
543 ConnectionLocality locality = ConnectionLocality.NOT_LOCAL;
544 if(this.connectionManager != null) {
545 locality = this.connectionManager.getLocalityStatus(n);
547 if (locality == ConnectionLocality.LOCAL) {
548 log.trace("trigger for node {} processes locally", n);
549 // delete the trigger and proceed with handling the trigger
550 this.triggers.remove(key);
551 // this is a potentially long running task
552 // off load it from the listener thread
553 Runnable r = new Runnable() {
556 // the node is local.
557 // call the read service
558 if (reader != null) {
559 List<FlowOnNode> flows = reader.nonCachedReadAllFlows(n);
561 flowStatistics.put(n, flows);
566 // submit the runnable for execution
567 if(this.triggerExecutor != null) {
568 this.triggerExecutor.execute(r);
571 } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) {
572 // flow statistics cache updated
574 log.trace("Got a flow statistics cache update for key {}", key);
575 // this is a short running task
576 // no need of off loading from the listener thread
577 final Node n = (Node) key;
578 // check if an outstanding trigger exists for this node
579 CountDownLatch l = this.latches.get(n);
581 // someone was waiting for this update
589 public void entryDeleted(Object key, String cacheName, boolean originLocal) {