Added method for getting non-cached flow statistics
[controller.git] / opendaylight / statisticsmanager / implementation / src / main / java / org / opendaylight / controller / statisticsmanager / internal / StatisticsManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.statisticsmanager.internal;
11
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.EnumSet;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Map.Entry;
20 import java.util.Set;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.opendaylight.controller.clustering.services.CacheConfigException;
30 import org.opendaylight.controller.clustering.services.CacheExistException;
31 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
32 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
33 import org.opendaylight.controller.clustering.services.IClusterServices;
34 import org.opendaylight.controller.connectionmanager.IConnectionManager;
35 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
36 import org.opendaylight.controller.sal.connection.ConnectionLocality;
37 import org.opendaylight.controller.sal.core.IContainer;
38 import org.opendaylight.controller.sal.core.Node;
39 import org.opendaylight.controller.sal.core.NodeConnector;
40 import org.opendaylight.controller.sal.core.NodeTable;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.flowprogrammer.Flow;
44 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
45 import org.opendaylight.controller.sal.reader.FlowOnNode;
46 import org.opendaylight.controller.sal.reader.IReadService;
47 import org.opendaylight.controller.sal.reader.IReadServiceListener;
48 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
49 import org.opendaylight.controller.sal.reader.NodeDescription;
50 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
51 import org.opendaylight.controller.sal.utils.ServiceHelper;
52 import org.opendaylight.controller.statisticsmanager.IStatisticsManager;
53 import org.opendaylight.controller.switchmanager.ISwitchManager;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * The class caches latest network nodes statistics as notified by reader
59  * services and provides API to retrieve them.
60  */
61 public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates,
62     ICacheUpdateAware<Object,Object> {
63     private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class);
64     private IContainer container;
65     private IClusterContainerServices clusterContainerService;
66     private IReadService reader;
67     private IConnectionManager connectionManager;
68     //statistics caches
69     private ConcurrentMap<Node, List<FlowOnNode>> flowStatistics;
70     private ConcurrentMap<Node, List<NodeConnectorStatistics>> nodeConnectorStatistics;
71     private ConcurrentMap<Node, List<NodeTableStatistics>> tableStatistics;
72     private ConcurrentMap<Node, NodeDescription> descriptionStatistics;
73
74     // data structure for latches
75     // this is not a cluster cache
76     private ConcurrentMap<Node, CountDownLatch> latches = new ConcurrentHashMap<Node, CountDownLatch>();
77     // 30 seconds is the timeout.
78     // the value of this can be tweaked based on performance tests.
79     private static long latchTimeout = 30;
80
81     // cache for flow stats refresh triggers
82     // an entry added to this map triggers the statistics manager
83     // to which the node is connected to get the latest flow stats from that node
84     // this is a cluster cache
85     private ConcurrentMap<Integer, Node> triggers;
86
87     // use an atomic integer for the triggers key
88     private AtomicInteger triggerKey = new AtomicInteger();
89
90     // single thread executor for the triggers
91     private ExecutorService triggerExecutor;
92
93     static final String TRIGGERS_CACHE = "statisticsmanager.triggers";
94     static final String FLOW_STATISTICS_CACHE = "statisticsmanager.flowStatistics";
95
96     private void nonClusterObjectCreate() {
97         flowStatistics = new ConcurrentHashMap<Node, List<FlowOnNode>>();
98         nodeConnectorStatistics = new ConcurrentHashMap<Node, List<NodeConnectorStatistics>>();
99         tableStatistics = new ConcurrentHashMap<Node, List<NodeTableStatistics>>();
100         descriptionStatistics = new ConcurrentHashMap<Node, NodeDescription>();
101         triggers = new ConcurrentHashMap<Integer, Node>();
102     }
103
104     @SuppressWarnings("deprecation")
105     private void allocateCaches() {
106         if (clusterContainerService == null) {
107             nonClusterObjectCreate();
108             log.error("Clustering service unavailable. Allocated non-cluster statistics manager cache.");
109             return;
110         }
111
112         try {
113             clusterContainerService.createCache(FLOW_STATISTICS_CACHE,
114                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
115             clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics",
116                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
117             clusterContainerService.createCache("statisticsmanager.tableStatistics",
118                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
119             clusterContainerService.createCache("statisticsmanager.descriptionStatistics",
120                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
121             clusterContainerService.createCache(TRIGGERS_CACHE,
122                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
123         } catch (CacheConfigException cce) {
124             log.error("Statistics cache configuration invalid - check cache mode");
125         } catch (CacheExistException ce) {
126             log.debug("Skipping statistics cache creation - already present");
127         }
128     }
129     @SuppressWarnings({ "unchecked", "deprecation" })
130     private void retrieveCaches() {
131         ConcurrentMap<?, ?> map;
132
133         if (this.clusterContainerService == null) {
134             log.warn("Can't retrieve statistics manager cache, Clustering service unavailable.");
135             return;
136         }
137
138         log.debug("Statistics Manager - retrieveCaches for Container {}", container);
139
140         map = clusterContainerService.getCache(FLOW_STATISTICS_CACHE);
141         if (map != null) {
142             this.flowStatistics = (ConcurrentMap<Node, List<FlowOnNode>>) map;
143         } else {
144             log.error("Cache allocation failed for statisticsmanager.flowStatistics in container {}", container.getName());
145         }
146
147         map = clusterContainerService.getCache("statisticsmanager.nodeConnectorStatistics");
148         if (map != null) {
149             this.nodeConnectorStatistics = (ConcurrentMap<Node, List<NodeConnectorStatistics>>) map;
150         } else {
151             log.error("Cache allocation failed for statisticsmanager.nodeConnectorStatistics in container {}", container.getName());
152         }
153
154         map = clusterContainerService.getCache("statisticsmanager.tableStatistics");
155         if (map != null) {
156             this.tableStatistics = (ConcurrentMap<Node, List<NodeTableStatistics>>) map;
157         } else {
158             log.error("Cache allocation failed for statisticsmanager.tableStatistics in container {}", container.getName());
159         }
160
161         map = clusterContainerService.getCache("statisticsmanager.descriptionStatistics");
162         if (map != null) {
163             this.descriptionStatistics = (ConcurrentMap<Node, NodeDescription>) map;
164         } else {
165             log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName());
166         }
167
168         map = clusterContainerService.getCache(TRIGGERS_CACHE);
169         if (map != null) {
170             this.triggers = (ConcurrentMap<Integer, Node>) map;
171         } else {
172             log.error("Cache allocation failed for " + TRIGGERS_CACHE +" in container {}", container.getName());
173         }
174     }
175
176     /**
177      * Function called by the dependency manager when all the required
178      * dependencies are satisfied
179      *
180      */
181     void init() {
182         log.debug("INIT called!");
183         allocateCaches();
184         retrieveCaches();
185
186     }
187
188     /**
189      * Function called by the dependency manager when at least one
190      * dependency become unsatisfied or when the component is shutting
191      * down because for example bundle is being stopped.
192      *
193      */
194     void destroy() {
195         log.debug("DESTROY called!");
196     }
197
198     /**
199      * Function called by dependency manager after "init ()" is called
200      * and after the services provided by the class are registered in
201      * the service registry
202      *
203      */
204     void start() {
205         log.debug("START called!");
206         this.triggerExecutor = Executors.newSingleThreadExecutor();
207     }
208
209     /**
210      * Function called after registering the service in OSGi service registry.
211      */
212     void started(){
213         // Retrieve current statistics so we don't have to wait for next refresh
214         ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(
215                 ISwitchManager.class, container.getName(), this);
216         if ((reader != null) && (switchManager != null)) {
217             Set<Node> nodeSet = switchManager.getNodes();
218             for (Node node : nodeSet) {
219                 List<FlowOnNode> flows = reader.readAllFlows(node);
220                 if (flows != null) {
221                     flowStatistics.put(node, flows);
222                 }
223                 NodeDescription descr = reader.readDescription(node);
224                 if (descr != null) {
225                     descriptionStatistics.put(node, descr);
226                 }
227                 List<NodeTableStatistics> tableStats = reader.readNodeTable(node);
228                 if (tableStats != null) {
229                     tableStatistics.put(node, tableStats);
230                 }
231                 List<NodeConnectorStatistics> ncStats = reader.readNodeConnectors(node);
232                 if (ncStats != null) {
233                     nodeConnectorStatistics.put(node, ncStats);
234                 }
235             }
236
237         } else {
238             log.trace("Failed to retrieve current statistics. Statistics will not be immediately available!");
239         }
240     }
241
242     /**
243      * Function called by the dependency manager before the services
244      * exported by the component are unregistered, this will be
245      * followed by a "destroy ()" calls
246      *
247      */
248     void stop() {
249         log.debug("STOP called!");
250         this.triggerExecutor.shutdownNow();
251     }
252
253     void setClusterContainerService(IClusterContainerServices s) {
254         log.debug("Cluster Service set for Statistics Mgr");
255         this.clusterContainerService = s;
256     }
257
258     void unsetClusterContainerService(IClusterContainerServices s) {
259         if (this.clusterContainerService == s) {
260             log.debug("Cluster Service removed for Statistics Mgr!");
261             this.clusterContainerService = null;
262         }
263     }
264     void setIContainer(IContainer c){
265         container = c;
266     }
267     public void unsetIContainer(IContainer s) {
268         if (this.container == s) {
269             this.container = null;
270         }
271     }
272
273     public void setReaderService(IReadService service) {
274         log.debug("Got inventory service set request {}", service);
275         this.reader = service;
276     }
277
278     public void unsetReaderService(IReadService service) {
279         log.debug("Got a service UNset request {}", service);
280         this.reader = null;
281     }
282
283     @Override
284     public List<FlowOnNode> getFlows(Node node) {
285         if (node == null) {
286             return Collections.emptyList();
287         }
288
289         List<FlowOnNode> flowList = new ArrayList<FlowOnNode>();
290         List<FlowOnNode> cachedList = flowStatistics.get(node);
291         if (cachedList != null){
292             flowList.addAll(cachedList);
293         }
294         return flowList;
295     }
296
297     /**
298      * {@inheritDoc}
299      */
300     @Override
301     public List<FlowOnNode> getFlowsNoCache(Node node) {
302         if (node == null) {
303             return Collections.emptyList();
304         }
305         // check if the node is local to this controller
306         ConnectionLocality locality = ConnectionLocality.LOCAL;
307         if(this.connectionManager != null) {
308             locality = this.connectionManager.getLocalityStatus(node);
309         }
310         if (locality == ConnectionLocality.NOT_LOCAL) {
311             // send a trigger to all and wait for either a response or timeout
312             CountDownLatch newLatch = new CountDownLatch(1);
313             CountDownLatch oldLatch = this.latches.putIfAbsent(node, newLatch);
314             this.triggers.put(this.triggerKey.incrementAndGet(), node);
315             try {
316                 boolean retStatus;
317                 if(oldLatch != null) {
318                     retStatus = oldLatch.await(this.latchTimeout, TimeUnit.SECONDS);
319                 } else {
320                     retStatus = newLatch.await(this.latchTimeout, TimeUnit.SECONDS);
321                 }
322                 // log the return code as it will give us, if
323                 // the latch timed out.
324                 log.debug("latch timed out {}", !retStatus);
325             } catch (InterruptedException e) {
326                 // log the error and move on
327                 log.warn("Waiting for statistics response interrupted", e);
328                 // restore the interrupt status
329                 // its a good practice to restore the interrupt status
330                 // if you are not propagating the InterruptedException
331                 Thread.currentThread().interrupt();
332             }
333             // now that the wait is over
334             // remove the latch entry
335             this.latches.remove(node);
336         } else {
337             // the node is local.
338             // call the read service
339             if (this.reader != null) {
340                 List<FlowOnNode> flows = reader.nonCachedReadAllFlows(node);
341                 if (flows != null) {
342                     nodeFlowStatisticsUpdated(node, flows);
343                 }
344             }
345         }
346         // at this point we are ready to return the cached value.
347         // this cached value will be up to date with a very high probability
348         // due to what we have done previously ie:- send a trigger for cache update
349         // or refreshed the cache if the node is local.
350         return getFlows(node);
351     }
352
353     @Override
354     public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(List<FlowEntry> flowList) {
355         Map<Node, List<FlowOnNode>> statMapOutput = new HashMap<Node, List<FlowOnNode>>();
356
357         if (flowList == null || flowList.isEmpty()){
358             return statMapOutput;
359         }
360
361         Node node;
362         // Index FlowEntries' flows by node so we don't traverse entire flow list for each flowEntry
363         Map<Node, Set<Flow>> index = new HashMap<Node, Set<Flow>>();
364         for (FlowEntry flowEntry : flowList) {
365             node = flowEntry.getNode();
366             Set<Flow> set = (index.containsKey(node) ? index.get(node) : new HashSet<Flow>());
367             set.add(flowEntry.getFlow());
368             index.put(node, set);
369         }
370
371         // Iterate over flows per indexed node and add to output
372         for (Entry<Node, Set<Flow>> indexEntry : index.entrySet()) {
373             node = indexEntry.getKey();
374             List<FlowOnNode> flowsPerNode = flowStatistics.get(node);
375
376             if (flowsPerNode != null && !flowsPerNode.isEmpty()){
377                 List<FlowOnNode> filteredFlows = statMapOutput.containsKey(node) ?
378                         statMapOutput.get(node) : new ArrayList<FlowOnNode>();
379
380                 for (FlowOnNode flowOnNode : flowsPerNode) {
381                     if (indexEntry.getValue().contains(flowOnNode.getFlow())) {
382                         filteredFlows.add(flowOnNode);
383                     }
384                 }
385                 statMapOutput.put(node, filteredFlows);
386             }
387         }
388         return statMapOutput;
389     }
390
391     @Override
392     public int getFlowsNumber(Node node) {
393         List<FlowOnNode> l;
394         if (node == null || (l = flowStatistics.get(node)) == null){
395             return -1;
396         }
397         return l.size();
398     }
399
400     @Override
401     public NodeDescription getNodeDescription(Node node) {
402         if (node == null){
403             return null;
404         }
405         NodeDescription nd = descriptionStatistics.get(node);
406         return nd != null? nd.clone() : null;
407     }
408
409     @Override
410     public NodeConnectorStatistics getNodeConnectorStatistics(NodeConnector nodeConnector) {
411         if (nodeConnector == null){
412             return null;
413         }
414
415         List<NodeConnectorStatistics> statList = nodeConnectorStatistics.get(nodeConnector.getNode());
416         if (statList != null){
417             for (NodeConnectorStatistics stat : statList) {
418                 if (stat.getNodeConnector().equals(nodeConnector)){
419                     return stat;
420                 }
421             }
422         }
423         return null;
424     }
425
426     @Override
427     public List<NodeConnectorStatistics> getNodeConnectorStatistics(Node node) {
428         if (node == null){
429             return null;
430         }
431
432         List<NodeConnectorStatistics> statList = new ArrayList<NodeConnectorStatistics>();
433         List<NodeConnectorStatistics> cachedList = nodeConnectorStatistics.get(node);
434         if (cachedList != null) {
435             statList.addAll(cachedList);
436         }
437         return statList;
438     }
439
440     @Override
441     public NodeTableStatistics getNodeTableStatistics(NodeTable nodeTable) {
442         if (nodeTable == null){
443             return null;
444         }
445         List<NodeTableStatistics> statList = tableStatistics.get(nodeTable.getNode());
446         if (statList != null){
447             for (NodeTableStatistics stat : statList) {
448                 if (stat.getNodeTable().getID().equals(nodeTable.getID())){
449                     return stat;
450                 }
451             }
452         }
453         return null;
454     }
455
456     @Override
457     public List<NodeTableStatistics> getNodeTableStatistics(Node node){
458         if (node == null){
459             return null;
460         }
461         List<NodeTableStatistics> statList = new ArrayList<NodeTableStatistics>();
462         List<NodeTableStatistics> cachedList = tableStatistics.get(node);
463         if (cachedList != null) {
464             statList.addAll(cachedList);
465         }
466         return statList;
467     }
468
469     @Override
470     public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
471         List<FlowOnNode> currentStat = this.flowStatistics.get(node);
472         // Update cache only if changed to avoid unnecessary cache sync operations
473         if (! flowStatsList.equals(currentStat)){
474             this.flowStatistics.put(node, flowStatsList);
475         }
476     }
477
478     @Override
479     public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
480         List<NodeConnectorStatistics> currentStat = this.nodeConnectorStatistics.get(node);
481         if (! ncStatsList.equals(currentStat)){
482             this.nodeConnectorStatistics.put(node, ncStatsList);
483         }
484     }
485
486     @Override
487     public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
488         List<NodeTableStatistics> currentStat = this.tableStatistics.get(node);
489         if (! tableStatsList.equals(currentStat)) {
490             this.tableStatistics.put(node, tableStatsList);
491         }
492     }
493
494     @Override
495     public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
496         NodeDescription currentDesc = this.descriptionStatistics.get(node);
497         if (! nodeDescription.equals(currentDesc)){
498             this.descriptionStatistics.put(node, nodeDescription);
499         }
500     }
501
502     @Override
503     public void updateNode(Node node, UpdateType type, Set<Property> props) {
504         // If node is removed, clean up stats mappings
505         if (type == UpdateType.REMOVED) {
506             flowStatistics.remove(node);
507             nodeConnectorStatistics.remove(node);
508             tableStatistics.remove(node);
509             descriptionStatistics.remove(node);
510         }
511     }
512
513     @Override
514     public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
515         // Not interested in this update
516     }
517
518     public void unsetIConnectionManager(IConnectionManager s) {
519         if (s == this.connectionManager) {
520             this.connectionManager = null;
521         }
522     }
523
524     public void setIConnectionManager(IConnectionManager s) {
525         this.connectionManager = s;
526     }
527
528     @Override
529     public void entryCreated(Object key, String cacheName, boolean originLocal) {
530         /*
531          * Do nothing
532          */
533     }
534
535     @Override
536     public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
537         if (originLocal) {
538             /*
539              * Local updates are of no interest
540              */
541             return;
542         }
543         if (cacheName.equals(TRIGGERS_CACHE)) {
544             log.trace("Got a trigger for key {} : value {}", key, new_value);
545             final Node n = (Node) new_value;
546             // check if the node is local to this controller
547             ConnectionLocality locality = ConnectionLocality.NOT_LOCAL;
548             if(this.connectionManager != null) {
549                 locality = this.connectionManager.getLocalityStatus(n);
550             }
551             if (locality == ConnectionLocality.LOCAL) {
552                 log.trace("trigger for node {} processes locally", n);
553                 // delete the trigger and proceed with handling the trigger
554                 this.triggers.remove(key);
555                 // this is a potentially long running task
556                 // off load it from the listener thread
557                 Runnable r = new Runnable() {
558                     @Override
559                     public void run() {
560                         // the node is local.
561                         // call the read service
562                         if (reader != null) {
563                             List<FlowOnNode> flows = reader.nonCachedReadAllFlows(n);
564                             if (flows != null) {
565                                 flowStatistics.put(n, flows);
566                             }
567                         }
568                     }
569                 };
570                 // submit the runnable for execution
571                 if(this.triggerExecutor != null) {
572                     this.triggerExecutor.execute(r);
573                 }
574             }
575         } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) {
576             // flow statistics cache updated
577             // get the node
578             log.trace("Got a flow statistics cache update for key {}", key);
579             // this is a short running task
580             // no need of off loading from the listener thread
581             final Node n = (Node) key;
582             // check if an outstanding trigger exists for this node
583             CountDownLatch l = this.latches.get(n);
584             if(l != null) {
585                 // someone was waiting for this update
586                 // let him know
587                 l.countDown();
588             }
589         }
590     }
591
592     @Override
593     public void entryDeleted(Object key, String cacheName, boolean originLocal) {
594         /*
595          * Do nothing
596          */
597     }
598 }