Improve logging in NetconfClient, logging-bridge. Increase connection timeout in...
[controller.git] / opendaylight / statisticsmanager / implementation / src / main / java / org / opendaylight / controller / statisticsmanager / internal / StatisticsManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.statisticsmanager.internal;
11
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.EnumSet;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Map.Entry;
20 import java.util.Set;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.opendaylight.controller.clustering.services.CacheConfigException;
30 import org.opendaylight.controller.clustering.services.CacheExistException;
31 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
32 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
33 import org.opendaylight.controller.clustering.services.IClusterServices;
34 import org.opendaylight.controller.connectionmanager.IConnectionManager;
35 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
36 import org.opendaylight.controller.sal.connection.ConnectionLocality;
37 import org.opendaylight.controller.sal.core.IContainer;
38 import org.opendaylight.controller.sal.core.Node;
39 import org.opendaylight.controller.sal.core.NodeConnector;
40 import org.opendaylight.controller.sal.core.NodeTable;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.flowprogrammer.Flow;
44 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
45 import org.opendaylight.controller.sal.reader.FlowOnNode;
46 import org.opendaylight.controller.sal.reader.IReadService;
47 import org.opendaylight.controller.sal.reader.IReadServiceListener;
48 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
49 import org.opendaylight.controller.sal.reader.NodeDescription;
50 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
51 import org.opendaylight.controller.sal.utils.ServiceHelper;
52 import org.opendaylight.controller.statisticsmanager.IStatisticsManager;
53 import org.opendaylight.controller.switchmanager.ISwitchManager;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * The class caches latest network nodes statistics as notified by reader
59  * services and provides API to retrieve them.
60  */
61 public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates,
62     ICacheUpdateAware<Object,Object> {
63     private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class);
64     private IContainer container;
65     private IClusterContainerServices clusterContainerService;
66     private IReadService reader;
67     private IConnectionManager connectionManager;
68     //statistics caches
69     private ConcurrentMap<Node, List<FlowOnNode>> flowStatistics;
70     private ConcurrentMap<Node, List<NodeConnectorStatistics>> nodeConnectorStatistics;
71     private ConcurrentMap<Node, List<NodeTableStatistics>> tableStatistics;
72     private ConcurrentMap<Node, NodeDescription> descriptionStatistics;
73
74     // data structure for latches
75     // this is not a cluster cache
76     private ConcurrentMap<Node, CountDownLatch> latches = new ConcurrentHashMap<Node, CountDownLatch>();
77     // 30 seconds is the timeout.
78     // the value of this can be tweaked based on performance tests.
79     private static long latchTimeout = 30;
80
81     // cache for flow stats refresh triggers
82     // an entry added to this map triggers the statistics manager
83     // to which the node is connected to get the latest flow stats from that node
84     // this is a cluster cache
85     private ConcurrentMap<Integer, Node> triggers;
86
87     // use an atomic integer for the triggers key
88     private AtomicInteger triggerKey = new AtomicInteger();
89
90     // single thread executor for the triggers
91     private ExecutorService triggerExecutor;
92
93     static final String TRIGGERS_CACHE = "statisticsmanager.triggers";
94     static final String FLOW_STATISTICS_CACHE = "statisticsmanager.flowStatistics";
95
96     private void nonClusterObjectCreate() {
97         flowStatistics = new ConcurrentHashMap<Node, List<FlowOnNode>>();
98         nodeConnectorStatistics = new ConcurrentHashMap<Node, List<NodeConnectorStatistics>>();
99         tableStatistics = new ConcurrentHashMap<Node, List<NodeTableStatistics>>();
100         descriptionStatistics = new ConcurrentHashMap<Node, NodeDescription>();
101         triggers = new ConcurrentHashMap<Integer, Node>();
102     }
103
104     private void allocateCaches() {
105         if (clusterContainerService == null) {
106             nonClusterObjectCreate();
107             log.error("Clustering service unavailable. Allocated non-cluster statistics manager cache.");
108             return;
109         }
110
111         try {
112             clusterContainerService.createCache(FLOW_STATISTICS_CACHE,
113                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
114             clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics",
115                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
116             clusterContainerService.createCache("statisticsmanager.tableStatistics",
117                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
118             clusterContainerService.createCache("statisticsmanager.descriptionStatistics",
119                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
120             clusterContainerService.createCache(TRIGGERS_CACHE,
121                     EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
122         } catch (CacheConfigException cce) {
123             log.error("Statistics cache configuration invalid - check cache mode");
124         } catch (CacheExistException ce) {
125             log.debug("Skipping statistics cache creation - already present");
126         }
127     }
128     @SuppressWarnings({ "unchecked" })
129     private void retrieveCaches() {
130         ConcurrentMap<?, ?> map;
131
132         if (this.clusterContainerService == null) {
133             log.warn("Can't retrieve statistics manager cache, Clustering service unavailable.");
134             return;
135         }
136
137         log.debug("Statistics Manager - retrieveCaches for Container {}", container);
138
139         map = clusterContainerService.getCache(FLOW_STATISTICS_CACHE);
140         if (map != null) {
141             this.flowStatistics = (ConcurrentMap<Node, List<FlowOnNode>>) map;
142         } else {
143             log.error("Cache allocation failed for statisticsmanager.flowStatistics in container {}", container.getName());
144         }
145
146         map = clusterContainerService.getCache("statisticsmanager.nodeConnectorStatistics");
147         if (map != null) {
148             this.nodeConnectorStatistics = (ConcurrentMap<Node, List<NodeConnectorStatistics>>) map;
149         } else {
150             log.error("Cache allocation failed for statisticsmanager.nodeConnectorStatistics in container {}", container.getName());
151         }
152
153         map = clusterContainerService.getCache("statisticsmanager.tableStatistics");
154         if (map != null) {
155             this.tableStatistics = (ConcurrentMap<Node, List<NodeTableStatistics>>) map;
156         } else {
157             log.error("Cache allocation failed for statisticsmanager.tableStatistics in container {}", container.getName());
158         }
159
160         map = clusterContainerService.getCache("statisticsmanager.descriptionStatistics");
161         if (map != null) {
162             this.descriptionStatistics = (ConcurrentMap<Node, NodeDescription>) map;
163         } else {
164             log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName());
165         }
166
167         map = clusterContainerService.getCache(TRIGGERS_CACHE);
168         if (map != null) {
169             this.triggers = (ConcurrentMap<Integer, Node>) map;
170         } else {
171             log.error("Cache allocation failed for " + TRIGGERS_CACHE +" in container {}", container.getName());
172         }
173     }
174
175     /**
176      * Function called by the dependency manager when all the required
177      * dependencies are satisfied
178      *
179      */
180     void init() {
181         log.debug("INIT called!");
182         allocateCaches();
183         retrieveCaches();
184
185     }
186
187     /**
188      * Function called by the dependency manager when at least one
189      * dependency become unsatisfied or when the component is shutting
190      * down because for example bundle is being stopped.
191      *
192      */
193     void destroy() {
194         log.debug("DESTROY called!");
195     }
196
197     /**
198      * Function called by dependency manager after "init ()" is called
199      * and after the services provided by the class are registered in
200      * the service registry
201      *
202      */
203     void start() {
204         log.debug("START called!");
205         this.triggerExecutor = Executors.newSingleThreadExecutor();
206     }
207
208     /**
209      * Function called after registering the service in OSGi service registry.
210      */
211     void started(){
212         // Retrieve current statistics so we don't have to wait for next refresh
213         ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(
214                 ISwitchManager.class, container.getName(), this);
215         if ((reader != null) && (switchManager != null)) {
216             Set<Node> nodeSet = switchManager.getNodes();
217             for (Node node : nodeSet) {
218                 List<FlowOnNode> flows = reader.readAllFlows(node);
219                 if (flows != null) {
220                     flowStatistics.put(node, flows);
221                 }
222                 NodeDescription descr = reader.readDescription(node);
223                 if (descr != null) {
224                     descriptionStatistics.put(node, descr);
225                 }
226                 List<NodeTableStatistics> tableStats = reader.readNodeTable(node);
227                 if (tableStats != null) {
228                     tableStatistics.put(node, tableStats);
229                 }
230                 List<NodeConnectorStatistics> ncStats = reader.readNodeConnectors(node);
231                 if (ncStats != null) {
232                     nodeConnectorStatistics.put(node, ncStats);
233                 }
234             }
235
236         } else {
237             log.trace("Failed to retrieve current statistics. Statistics will not be immediately available!");
238         }
239     }
240
241     /**
242      * Function called by the dependency manager before the services
243      * exported by the component are unregistered, this will be
244      * followed by a "destroy ()" calls
245      *
246      */
247     void stop() {
248         log.debug("STOP called!");
249         this.triggerExecutor.shutdownNow();
250     }
251
252     void setClusterContainerService(IClusterContainerServices s) {
253         log.debug("Cluster Service set for Statistics Mgr");
254         this.clusterContainerService = s;
255     }
256
257     void unsetClusterContainerService(IClusterContainerServices s) {
258         if (this.clusterContainerService == s) {
259             log.debug("Cluster Service removed for Statistics Mgr!");
260             this.clusterContainerService = null;
261         }
262     }
263     void setIContainer(IContainer c){
264         container = c;
265     }
266     public void unsetIContainer(IContainer s) {
267         if (this.container == s) {
268             this.container = null;
269         }
270     }
271
272     public void setReaderService(IReadService service) {
273         log.debug("Got inventory service set request {}", service);
274         this.reader = service;
275     }
276
277     public void unsetReaderService(IReadService service) {
278         log.debug("Got a service UNset request {}", service);
279         this.reader = null;
280     }
281
282     @Override
283     public List<FlowOnNode> getFlows(Node node) {
284         if (node == null) {
285             return Collections.emptyList();
286         }
287
288         List<FlowOnNode> flowList = new ArrayList<FlowOnNode>();
289         List<FlowOnNode> cachedList = flowStatistics.get(node);
290         if (cachedList != null){
291             flowList.addAll(cachedList);
292         }
293         return flowList;
294     }
295
296     /**
297      * {@inheritDoc}
298      */
299     @Override
300     public List<FlowOnNode> getFlowsNoCache(Node node) {
301         if (node == null) {
302             return Collections.emptyList();
303         }
304         // check if the node is local to this controller
305         ConnectionLocality locality = ConnectionLocality.LOCAL;
306         if(this.connectionManager != null) {
307             locality = this.connectionManager.getLocalityStatus(node);
308         }
309         if (locality == ConnectionLocality.NOT_LOCAL) {
310             // send a trigger to all and wait for either a response or timeout
311             CountDownLatch newLatch = new CountDownLatch(1);
312             CountDownLatch oldLatch = this.latches.putIfAbsent(node, newLatch);
313             this.triggers.put(this.triggerKey.incrementAndGet(), node);
314             try {
315                 boolean retStatus;
316                 if(oldLatch != null) {
317                     retStatus = oldLatch.await(this.latchTimeout, TimeUnit.SECONDS);
318                 } else {
319                     retStatus = newLatch.await(this.latchTimeout, TimeUnit.SECONDS);
320                 }
321                 // log the return code as it will give us, if
322                 // the latch timed out.
323                 log.debug("latch timed out {}", !retStatus);
324             } catch (InterruptedException e) {
325                 // log the error and move on
326                 log.warn("Waiting for statistics response interrupted", e);
327                 // restore the interrupt status
328                 // its a good practice to restore the interrupt status
329                 // if you are not propagating the InterruptedException
330                 Thread.currentThread().interrupt();
331             }
332             // now that the wait is over
333             // remove the latch entry
334             this.latches.remove(node);
335         } else {
336             // the node is local.
337             // call the read service
338             if (this.reader != null) {
339                 List<FlowOnNode> flows = reader.nonCachedReadAllFlows(node);
340                 if (flows != null) {
341                     nodeFlowStatisticsUpdated(node, flows);
342                 }
343             }
344         }
345         // at this point we are ready to return the cached value.
346         // this cached value will be up to date with a very high probability
347         // due to what we have done previously ie:- send a trigger for cache update
348         // or refreshed the cache if the node is local.
349         return getFlows(node);
350     }
351
352     @Override
353     public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(List<FlowEntry> flowList) {
354         Map<Node, List<FlowOnNode>> statMapOutput = new HashMap<Node, List<FlowOnNode>>();
355
356         if (flowList == null || flowList.isEmpty()){
357             return statMapOutput;
358         }
359
360         Node node;
361         // Index FlowEntries' flows by node so we don't traverse entire flow list for each flowEntry
362         Map<Node, Set<Flow>> index = new HashMap<Node, Set<Flow>>();
363         for (FlowEntry flowEntry : flowList) {
364             node = flowEntry.getNode();
365             Set<Flow> set = (index.containsKey(node) ? index.get(node) : new HashSet<Flow>());
366             set.add(flowEntry.getFlow());
367             index.put(node, set);
368         }
369
370         // Iterate over flows per indexed node and add to output
371         for (Entry<Node, Set<Flow>> indexEntry : index.entrySet()) {
372             node = indexEntry.getKey();
373             List<FlowOnNode> flowsPerNode = flowStatistics.get(node);
374
375             if (flowsPerNode != null && !flowsPerNode.isEmpty()){
376                 List<FlowOnNode> filteredFlows = statMapOutput.containsKey(node) ?
377                         statMapOutput.get(node) : new ArrayList<FlowOnNode>();
378
379                 for (FlowOnNode flowOnNode : flowsPerNode) {
380                     if (indexEntry.getValue().contains(flowOnNode.getFlow())) {
381                         filteredFlows.add(flowOnNode);
382                     }
383                 }
384                 statMapOutput.put(node, filteredFlows);
385             }
386         }
387         return statMapOutput;
388     }
389
390     @Override
391     public int getFlowsNumber(Node node) {
392         List<FlowOnNode> l;
393         if (node == null || (l = flowStatistics.get(node)) == null){
394             return -1;
395         }
396         return l.size();
397     }
398
399     @Override
400     public NodeDescription getNodeDescription(Node node) {
401         if (node == null){
402             return null;
403         }
404         NodeDescription nd = descriptionStatistics.get(node);
405         return nd != null? nd.clone() : null;
406     }
407
408     @Override
409     public NodeConnectorStatistics getNodeConnectorStatistics(NodeConnector nodeConnector) {
410         if (nodeConnector == null){
411             return null;
412         }
413
414         List<NodeConnectorStatistics> statList = nodeConnectorStatistics.get(nodeConnector.getNode());
415         if (statList != null){
416             for (NodeConnectorStatistics stat : statList) {
417                 if (stat.getNodeConnector().equals(nodeConnector)){
418                     return stat;
419                 }
420             }
421         }
422         return null;
423     }
424
425     @Override
426     public List<NodeConnectorStatistics> getNodeConnectorStatistics(Node node) {
427         if (node == null){
428             return Collections.emptyList();
429         }
430
431         List<NodeConnectorStatistics> statList = new ArrayList<NodeConnectorStatistics>();
432         List<NodeConnectorStatistics> cachedList = nodeConnectorStatistics.get(node);
433         if (cachedList != null) {
434             statList.addAll(cachedList);
435         }
436         return statList;
437     }
438
439     @Override
440     public NodeTableStatistics getNodeTableStatistics(NodeTable nodeTable) {
441         if (nodeTable == null){
442             return null;
443         }
444         List<NodeTableStatistics> statList = tableStatistics.get(nodeTable.getNode());
445         if (statList != null){
446             for (NodeTableStatistics stat : statList) {
447                 if (stat.getNodeTable().getID().equals(nodeTable.getID())){
448                     return stat;
449                 }
450             }
451         }
452         return null;
453     }
454
455     @Override
456     public List<NodeTableStatistics> getNodeTableStatistics(Node node){
457         if (node == null){
458             return Collections.emptyList();
459         }
460         List<NodeTableStatistics> statList = new ArrayList<NodeTableStatistics>();
461         List<NodeTableStatistics> cachedList = tableStatistics.get(node);
462         if (cachedList != null) {
463             statList.addAll(cachedList);
464         }
465         return statList;
466     }
467
468     @Override
469     public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
470         List<FlowOnNode> currentStat = this.flowStatistics.get(node);
471         // Update cache only if changed to avoid unnecessary cache sync operations
472         if (! flowStatsList.equals(currentStat)){
473             this.flowStatistics.put(node, flowStatsList);
474         }
475     }
476
477     @Override
478     public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
479         List<NodeConnectorStatistics> currentStat = this.nodeConnectorStatistics.get(node);
480         if (! ncStatsList.equals(currentStat)){
481             this.nodeConnectorStatistics.put(node, ncStatsList);
482         }
483     }
484
485     @Override
486     public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
487         List<NodeTableStatistics> currentStat = this.tableStatistics.get(node);
488         if (! tableStatsList.equals(currentStat)) {
489             this.tableStatistics.put(node, tableStatsList);
490         }
491     }
492
493     @Override
494     public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
495         NodeDescription currentDesc = this.descriptionStatistics.get(node);
496         if (! nodeDescription.equals(currentDesc)){
497             this.descriptionStatistics.put(node, nodeDescription);
498         }
499     }
500
501     @Override
502     public void updateNode(Node node, UpdateType type, Set<Property> props) {
503         // If node is removed, clean up stats mappings
504         if (type == UpdateType.REMOVED) {
505             flowStatistics.remove(node);
506             nodeConnectorStatistics.remove(node);
507             tableStatistics.remove(node);
508             descriptionStatistics.remove(node);
509         }
510     }
511
512     @Override
513     public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
514         // Not interested in this update
515     }
516
517     public void unsetIConnectionManager(IConnectionManager s) {
518         if (s == this.connectionManager) {
519             this.connectionManager = null;
520         }
521     }
522
523     public void setIConnectionManager(IConnectionManager s) {
524         this.connectionManager = s;
525     }
526
527     @Override
528     public void entryCreated(Object key, String cacheName, boolean originLocal) {
529         /*
530          * Do nothing
531          */
532     }
533
534     @Override
535     public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
536         if (originLocal) {
537             /*
538              * Local updates are of no interest
539              */
540             return;
541         }
542         if (cacheName.equals(TRIGGERS_CACHE)) {
543             log.trace("Got a trigger for key {} : value {}", key, new_value);
544             final Node n = (Node) new_value;
545             // check if the node is local to this controller
546             ConnectionLocality locality = ConnectionLocality.NOT_LOCAL;
547             if(this.connectionManager != null) {
548                 locality = this.connectionManager.getLocalityStatus(n);
549             }
550             if (locality == ConnectionLocality.LOCAL) {
551                 log.trace("trigger for node {} processes locally", n);
552                 // delete the trigger and proceed with handling the trigger
553                 this.triggers.remove(key);
554                 // this is a potentially long running task
555                 // off load it from the listener thread
556                 Runnable r = new Runnable() {
557                     @Override
558                     public void run() {
559                         // the node is local.
560                         // call the read service
561                         if (reader != null) {
562                             List<FlowOnNode> flows = reader.nonCachedReadAllFlows(n);
563                             if (flows != null) {
564                                 flowStatistics.put(n, flows);
565                             }
566                         }
567                     }
568                 };
569                 // submit the runnable for execution
570                 if(this.triggerExecutor != null) {
571                     this.triggerExecutor.execute(r);
572                 }
573             }
574         } else if (cacheName.equals(FLOW_STATISTICS_CACHE)) {
575             // flow statistics cache updated
576             // get the node
577             log.trace("Got a flow statistics cache update for key {}", key);
578             // this is a short running task
579             // no need of off loading from the listener thread
580             final Node n = (Node) key;
581             // check if an outstanding trigger exists for this node
582             CountDownLatch l = this.latches.get(n);
583             if(l != null) {
584                 // someone was waiting for this update
585                 // let him know
586                 l.countDown();
587             }
588         }
589     }
590
591     @Override
592     public void entryDeleted(Object key, String cacheName, boolean originLocal) {
593         /*
594          * Do nothing
595          */
596     }
597 }