3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.statisticsmanager.internal;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.EnumSet;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
19 import java.util.Map.Entry;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
29 import org.opendaylight.controller.clustering.services.CacheConfigException;
30 import org.opendaylight.controller.clustering.services.CacheExistException;
31 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
32 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
33 import org.opendaylight.controller.clustering.services.IClusterServices;
34 import org.opendaylight.controller.connectionmanager.IConnectionManager;
35 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
36 import org.opendaylight.controller.sal.connection.ConnectionLocality;
37 import org.opendaylight.controller.sal.core.IContainer;
38 import org.opendaylight.controller.sal.core.Node;
39 import org.opendaylight.controller.sal.core.NodeConnector;
40 import org.opendaylight.controller.sal.core.NodeTable;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.flowprogrammer.Flow;
44 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
45 import org.opendaylight.controller.sal.reader.FlowOnNode;
46 import org.opendaylight.controller.sal.reader.IReadService;
47 import org.opendaylight.controller.sal.reader.IReadServiceListener;
48 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
49 import org.opendaylight.controller.sal.reader.NodeDescription;
50 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
51 import org.opendaylight.controller.sal.utils.ServiceHelper;
52 import org.opendaylight.controller.statisticsmanager.IStatisticsManager;
53 import org.opendaylight.controller.switchmanager.ISwitchManager;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * The class caches latest network nodes statistics as notified by reader
59 * services and provides API to retrieve them.
61 public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates,
62 ICacheUpdateAware<Object,Object> {
63 private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class);
64 private IContainer container;
65 private IClusterContainerServices clusterContainerService;
66 private IReadService reader;
67 private IConnectionManager connectionManager;
69 private ConcurrentMap<Node, List<FlowOnNode>> flowStatistics;
70 private ConcurrentMap<Node, List<NodeConnectorStatistics>> nodeConnectorStatistics;
71 private ConcurrentMap<Node, List<NodeTableStatistics>> tableStatistics;
72 private ConcurrentMap<Node, NodeDescription> descriptionStatistics;
74 // data structure for latches
75 // this is not a cluster cache
76 private ConcurrentMap<Node, CountDownLatch> latches = new ConcurrentHashMap<Node, CountDownLatch>();
77 // 30 seconds is the timeout.
78 // the value of this can be tweaked based on performance tests.
79 private static long latchTimeout = 30;
81 // cache for flow stats refresh triggers
82 // an entry added to this map triggers the statistics manager
83 // to which the node is connected to get the latest flow stats from that node
84 // this is a cluster cache
85 private ConcurrentMap<Integer, Node> triggers;
87 // use an atomic integer for the triggers key
88 private AtomicInteger triggerKey = new AtomicInteger();
90 // single thread executor for the triggers
91 private ExecutorService triggerExecutor;
93 static final String TRIGGERS_CACHE = "statisticsmanager.triggers";
94 static final String FLOW_STATISTICS_CACHE = "statisticsmanager.flowStatistics";
96 private void nonClusterObjectCreate() {
97 flowStatistics = new ConcurrentHashMap<Node, List<FlowOnNode>>();
98 nodeConnectorStatistics = new ConcurrentHashMap<Node, List<NodeConnectorStatistics>>();
99 tableStatistics = new ConcurrentHashMap<Node, List<NodeTableStatistics>>();
100 descriptionStatistics = new ConcurrentHashMap<Node, NodeDescription>();
101 triggers = new ConcurrentHashMap<Integer, Node>();
104 @SuppressWarnings("deprecation")
105 private void allocateCaches() {
106 if (clusterContainerService == null) {
107 nonClusterObjectCreate();
108 log.error("Clustering service unavailable. Allocated non-cluster statistics manager cache.");
113 clusterContainerService.createCache(FLOW_STATISTICS_CACHE,
114 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
115 clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics",
116 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
117 clusterContainerService.createCache("statisticsmanager.tableStatistics",
118 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
119 clusterContainerService.createCache("statisticsmanager.descriptionStatistics",
120 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
121 clusterContainerService.createCache(TRIGGERS_CACHE,
122 EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
123 } catch (CacheConfigException cce) {
124 log.error("Statistics cache configuration invalid - check cache mode");
125 } catch (CacheExistException ce) {
126 log.debug("Skipping statistics cache creation - already present");
129 @SuppressWarnings({ "unchecked", "deprecation" })
130 private void retrieveCaches() {
131 ConcurrentMap<?, ?> map;
133 if (this.clusterContainerService == null) {
134 log.warn("Can't retrieve statistics manager cache, Clustering service unavailable.");
138 log.debug("Statistics Manager - retrieveCaches for Container {}", container);
140 map = clusterContainerService.getCache(FLOW_STATISTICS_CACHE);
142 this.flowStatistics = (ConcurrentMap<Node, List<FlowOnNode>>) map;
144 log.error("Cache allocation failed for statisticsmanager.flowStatistics in container {}", container.getName());
147 map = clusterContainerService.getCache("statisticsmanager.nodeConnectorStatistics");
149 this.nodeConnectorStatistics = (ConcurrentMap<Node, List<NodeConnectorStatistics>>) map;
151 log.error("Cache allocation failed for statisticsmanager.nodeConnectorStatistics in container {}", container.getName());
154 map = clusterContainerService.getCache("statisticsmanager.tableStatistics");
156 this.tableStatistics = (ConcurrentMap<Node, List<NodeTableStatistics>>) map;
158 log.error("Cache allocation failed for statisticsmanager.tableStatistics in container {}", container.getName());
161 map = clusterContainerService.getCache("statisticsmanager.descriptionStatistics");
163 this.descriptionStatistics = (ConcurrentMap<Node, NodeDescription>) map;
165 log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName());
168 map = clusterContainerService.getCache(TRIGGERS_CACHE);
170 this.triggers = (ConcurrentMap<Integer, Node>) map;
172 log.error("Cache allocation failed for " + TRIGGERS_CACHE +" in container {}", container.getName());
177 * Function called by the dependency manager when all the required
178 * dependencies are satisfied
182 log.debug("INIT called!");
189 * Function called by the dependency manager when at least one
190 * dependency become unsatisfied or when the component is shutting
191 * down because for example bundle is being stopped.
195 log.debug("DESTROY called!");
199 * Function called by dependency manager after "init ()" is called
200 * and after the services provided by the class are registered in
201 * the service registry
205 log.debug("START called!");
206 this.triggerExecutor = Executors.newSingleThreadExecutor();
210 * Function called after registering the service in OSGi service registry.
213 // Retrieve current statistics so we don't have to wait for next refresh
214 ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(
215 ISwitchManager.class, container.getName(), this);
216 if ((reader != null) && (switchManager != null)) {
217 Set<Node> nodeSet = switchManager.getNodes();
218 for (Node node : nodeSet) {
219 List<FlowOnNode> flows = reader.readAllFlows(node);
221 flowStatistics.put(node, flows);
223 NodeDescription descr = reader.readDescription(node);
225 descriptionStatistics.put(node, descr);
227 List<NodeTableStatistics> tableStats = reader.readNodeTable(node);
228 if (tableStats != null) {
229 tableStatistics.put(node, tableStats);
231 List<NodeConnectorStatistics> ncStats = reader.readNodeConnectors(node);
232 if (ncStats != null) {
233 nodeConnectorStatistics.put(node, ncStats);
238 log.trace("Failed to retrieve current statistics. Statistics will not be immediately available!");
243 * Function called by the dependency manager before the services
244 * exported by the component are unregistered, this will be
245 * followed by a "destroy ()" calls
249 log.debug("STOP called!");
250 this.triggerExecutor.shutdownNow();
253 void setClusterContainerService(IClusterContainerServices s) {
254 log.debug("Cluster Service set for Statistics Mgr");
255 this.clusterContainerService = s;
258 void unsetClusterContainerService(IClusterContainerServices s) {
259 if (this.clusterContainerService == s) {
260 log.debug("Cluster Service removed for Statistics Mgr!");
261 this.clusterContainerService = null;
264 void setIContainer(IContainer c){
267 public void unsetIContainer(IContainer s) {
268 if (this.container == s) {
269 this.container = null;
273 public void setReaderService(IReadService service) {
274 log.debug("Got inventory service set request {}", service);
275 this.reader = service;
278 public void unsetReaderService(IReadService service) {
279 log.debug("Got a service UNset request {}", service);
284 public List<FlowOnNode> getFlows(Node node) {
286 return Collections.emptyList();
289 List<FlowOnNode> flowList = new ArrayList<FlowOnNode>();
290 List<FlowOnNode> cachedList = flowStatistics.get(node);
291 if (cachedList != null){
292 flowList.addAll(cachedList);
301 public List<FlowOnNode> getFlowsNoCache(Node node) {
303 return Collections.emptyList();
305 // check if the node is local to this controller
306 ConnectionLocality locality = ConnectionLocality.LOCAL;
307 if(this.connectionManager != null) {
308 locality = this.connectionManager.getLocalityStatus(node);
310 if (locality == ConnectionLocality.NOT_LOCAL) {
311 // send a trigger to all and wait for either a response or timeout
312 CountDownLatch newLatch = new CountDownLatch(1);
313 CountDownLatch oldLatch = this.latches.putIfAbsent(node, newLatch);
314 this.triggers.put(this.triggerKey.incrementAndGet(), node);
317 if(oldLatch != null) {
318 retStatus = oldLatch.await(this.latchTimeout, TimeUnit.SECONDS);
320 retStatus = newLatch.await(this.latchTimeout, TimeUnit.SECONDS);
322 // log the return code as it will give us, if
323 // the latch timed out.
324 log.debug("latch timed out {}", !retStatus);
325 } catch (InterruptedException e) {
326 // log the error and move on
327 log.warn("Waiting for statistics response interrupted", e);
328 // restore the interrupt status
329 // its a good practice to restore the interrupt status
330 // if you are not propagating the InterruptedException
331 Thread.currentThread().interrupt();
333 // now that the wait is over
334 // remove the latch entry
335 this.latches.remove(node);
337 // the node is local.
338 // call the read service
339 if (this.reader != null) {
340 List<FlowOnNode> flows = reader.nonCachedReadAllFlows(node);
342 nodeFlowStatisticsUpdated(node, flows);
346 // at this point we are ready to return the cached value.
347 // this cached value will be up to date with a very high probability
348 // due to what we have done previously ie:- send a trigger for cache update
349 // or refreshed the cache if the node is local.
350 return getFlows(node);
354 public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(List<FlowEntry> flowList) {
355 Map<Node, List<FlowOnNode>> statMapOutput = new HashMap<Node, List<FlowOnNode>>();
357 if (flowList == null || flowList.isEmpty()){
358 return statMapOutput;
362 // Index FlowEntries' flows by node so we don't traverse entire flow list for each flowEntry
363 Map<Node, Set<Flow>> index = new HashMap<Node, Set<Flow>>();
364 for (FlowEntry flowEntry : flowList) {
365 node = flowEntry.getNode();
366 Set<Flow> set = (index.containsKey(node) ? index.get(node) : new HashSet<Flow>());
367 set.add(flowEntry.getFlow());
368 index.put(node, set);
371 // Iterate over flows per indexed node and add to output
372 for (Entry<Node, Set<Flow>> indexEntry : index.entrySet()) {
373 node = indexEntry.getKey();
374 List<FlowOnNode> flowsPerNode = flowStatistics.get(node);
376 if (flowsPerNode != null && !flowsPerNode.isEmpty()){
377 List<FlowOnNode> filteredFlows = statMapOutput.containsKey(node) ?
378 statMapOutput.get(node) : new ArrayList<FlowOnNode>();
380 for (FlowOnNode flowOnNode : flowsPerNode) {
381 if (indexEntry.getValue().contains(flowOnNode.getFlow())) {
382 filteredFlows.add(flowOnNode);
385 statMapOutput.put(node, filteredFlows);
388 return statMapOutput;
392 public int getFlowsNumber(Node node) {
394 if (node == null || (l = flowStatistics.get(node)) == null){
401 public NodeDescription getNodeDescription(Node node) {
405 NodeDescription nd = descriptionStatistics.get(node);
406 return nd != null? nd.clone() : null;
410 public NodeConnectorStatistics getNodeConnectorStatistics(NodeConnector nodeConnector) {
411 if (nodeConnector == null){
415 List<NodeConnectorStatistics> statList = nodeConnectorStatistics.get(nodeConnector.getNode());
416 if (statList != null){
417 for (NodeConnectorStatistics stat : statList) {
418 if (stat.getNodeConnector().equals(nodeConnector)){
427 public List<NodeConnectorStatistics> getNodeConnectorStatistics(Node node) {
432 List<NodeConnectorStatistics> statList = new ArrayList<NodeConnectorStatistics>();
433 List<NodeConnectorStatistics> cachedList = nodeConnectorStatistics.get(node);
434 if (cachedList != null) {
435 statList.addAll(cachedList);
441 public NodeTableStatistics getNodeTableStatistics(NodeTable nodeTable) {
442 if (nodeTable == null){
445 List<NodeTableStatistics> statList = tableStatistics.get(nodeTable.getNode());
446 if (statList != null){
447 for (NodeTableStatistics stat : statList) {
448 if (stat.getNodeTable().getID().equals(nodeTable.getID())){
457 public List<NodeTableStatistics> getNodeTableStatistics(Node node){
461 List<NodeTableStatistics> statList = new ArrayList<NodeTableStatistics>();
462 List<NodeTableStatistics> cachedList = tableStatistics.get(node);
463 if (cachedList != null) {
464 statList.addAll(cachedList);
470 public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
471 List<FlowOnNode> currentStat = this.flowStatistics.get(node);
472 // Update cache only if changed to avoid unnecessary cache sync operations
473 if (! flowStatsList.equals(currentStat)){
474 this.flowStatistics.put(node, flowStatsList);
479 public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
480 List<NodeConnectorStatistics> currentStat = this.nodeConnectorStatistics.get(node);
481 if (! ncStatsList.equals(currentStat)){
482 this.nodeConnectorStatistics.put(node, ncStatsList);
487 public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
488 List<NodeTableStatistics> currentStat = this.tableStatistics.get(node);
489 if (! tableStatsList.equals(currentStat)) {
490 this.tableStatistics.put(node, tableStatsList);
495 public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
496 NodeDescription currentDesc = this.descriptionStatistics.get(node);
497 if (! nodeDescription.equals(currentDesc)){
498 this.descriptionStatistics.put(node, nodeDescription);
503 public void updateNode(Node node, UpdateType type, Set<Property> props) {
504 // If node is removed, clean up stats mappings
505 if (type == UpdateType.REMOVED) {
506 flowStatistics.remove(node);
507 nodeConnectorStatistics.remove(node);
508 tableStatistics.remove(node);
509 descriptionStatistics.remove(node);
514 public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
515 // Not interested in this update
518 public void unsetIConnectionManager(IConnectionManager s) {
519 if (s == this.connectionManager) {
520 this.connectionManager = null;
524 public void setIConnectionManager(IConnectionManager s) {
525 this.connectionManager = s;
529 public void entryCreated(Object key, String cacheName, boolean originLocal) {
536 public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
539 * Local updates are of no interest
543 if (cacheName.equals(TRIGGERS_CACHE)) {
544 log.trace("Got a trigger for key {} : value {}", key, new_value);
545 final Node n = (Node) new_value;
546 // check if the node is local to this controller
547 ConnectionLocality locality = ConnectionLocality.NOT_LOCAL;
548 if(this.connectionManager != null) {
549 locality = this.connectionManager.getLocalityStatus(n);
551 if (locality == ConnectionLocality.LOCAL) {
552 log.trace("trigger for node {} processes locally", n);
553 // delete the trigger and proceed with handling the trigger
554 this.triggers.remove(key);
555 // this is a potentially long running task
556 // off load it from the listener thread
557 Runnable r = new Runnable() {
560 // the node is local.
561 // call the read service
562 if (reader != null) {
563 List<FlowOnNode> flows = reader.nonCachedReadAllFlows(n);
565 flowStatistics.put(n, flows);
570 // submit the runnable for execution
571 if(this.triggerExecutor != null) {
572 this.triggerExecutor.execute(r);
575 } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) {
576 // flow statistics cache updated
578 log.trace("Got a flow statistics cache update for key {}", key);
579 // this is a short running task
580 // no need of off loading from the listener thread
581 final Node n = (Node) key;
582 // check if an outstanding trigger exists for this node
583 CountDownLatch l = this.latches.get(n);
585 // someone was waiting for this update
593 public void entryDeleted(Object key, String cacheName, boolean originLocal) {