Merge "Fix for NullPointerException"
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Future;
16
17 import org.eclipse.xtext.xbase.lib.Exceptions;
18 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
19 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
20 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
21 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
22 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.DataObject;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.binding.NotificationListener;
70 import org.opendaylight.yangtools.yang.common.RpcResult;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 /** 
75  * Following are main responsibilities of the class:
76  * 1) Invoke statistics request thread to send periodic statistics request to all the 
77  * flow capable switch connected to the controller. It sends statistics request for 
78  * Group,Meter,Table,Flow,Queue,Aggregate stats.   
79  * 
80  * 2) Invoke statistics ager thread, to clean up all the stale statistics data from 
81  * operational data store.
82  * 
83  * @author avishnoi@in.ibm.com
84  *
85  */
86 public class StatisticsProvider implements AutoCloseable {
87
88     public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
89     
90     private DataProviderService dps;
91     
92     private DataBrokerService dbs;
93
94     private NotificationProviderService nps;
95     
96     private OpendaylightGroupStatisticsService groupStatsService;
97     
98     private OpendaylightMeterStatisticsService meterStatsService;
99     
100     private OpendaylightFlowStatisticsService flowStatsService;
101     
102     private OpendaylightPortStatisticsService portStatsService;
103
104     private OpendaylightFlowTableStatisticsService flowTableStatsService;
105
106     private OpendaylightQueueStatisticsService queueStatsService;
107
108     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
109     
110     private StatisticsUpdateHandler statsUpdateHandler;
111     
112     private Thread statisticsRequesterThread;
113     
114     private Thread statisticsAgerThread;
115
116     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
117     
118     public static final int STATS_THREAD_EXECUTION_TIME= 30000;
119     //Local caching of stats
120     
121     private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache = 
122             new ConcurrentHashMap<NodeId,NodeStatisticsAger>();
123     
124     public DataProviderService getDataService() {
125       return this.dps;
126     }
127     
128     public void setDataService(final DataProviderService dataService) {
129       this.dps = dataService;
130     }
131     
132     public DataBrokerService getDataBrokerService() {
133         return this.dbs;
134     }
135       
136     public void setDataBrokerService(final DataBrokerService dataBrokerService) {
137         this.dbs = dataBrokerService;
138     }
139
140     public NotificationProviderService getNotificationService() {
141       return this.nps;
142     }
143     
144     public void setNotificationService(final NotificationProviderService notificationService) {
145       this.nps = notificationService;
146     }
147
148     public MultipartMessageManager getMultipartMessageManager() {
149         return multipartMessageManager;
150     }
151
152     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
153     
154     private Registration<NotificationListener> listenerRegistration;
155     
156     public void start() {
157         
158         NotificationProviderService nps = this.getNotificationService();
159         Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
160         this.listenerRegistration = registerNotificationListener;
161         
162         statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
163         
164         registerDataStoreUpdateListener(this.getDataBrokerService());
165         
166         // Get Group/Meter statistics service instance
167         groupStatsService = StatisticsManagerActivator.getProviderContext().
168                 getRpcService(OpendaylightGroupStatisticsService.class);
169         
170         meterStatsService = StatisticsManagerActivator.getProviderContext().
171                 getRpcService(OpendaylightMeterStatisticsService.class);
172         
173         flowStatsService = StatisticsManagerActivator.getProviderContext().
174                 getRpcService(OpendaylightFlowStatisticsService.class);
175
176         portStatsService = StatisticsManagerActivator.getProviderContext().
177                 getRpcService(OpendaylightPortStatisticsService.class);
178
179         flowTableStatsService = StatisticsManagerActivator.getProviderContext().
180                 getRpcService(OpendaylightFlowTableStatisticsService.class);
181         
182         queueStatsService = StatisticsManagerActivator.getProviderContext().
183                 getRpcService(OpendaylightQueueStatisticsService.class);
184         
185         statisticsRequesterThread = new Thread( new Runnable(){
186
187             @Override
188             public void run() {
189                 while(true){
190                     try {
191                         statsRequestSender();
192                         
193                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
194                     }catch (Exception e){
195                         spLogger.error("Exception occurred while sending stats request : {}",e);
196                     }
197                 }
198             }
199         });
200         
201         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
202         
203         statisticsRequesterThread.start();
204         
205         statisticsAgerThread = new Thread( new Runnable(){
206
207             @Override
208             public void run() {
209                 while(true){
210                     try {
211                         for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
212                             nodeStatisticsAger.cleanStaleStatistics();
213                         }
214                         
215                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
216                     }catch (Exception e){
217                         spLogger.error("Exception occurred while sending stats request : {}",e);
218                     }
219                 }
220             }
221         });
222         
223         spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
224
225         statisticsAgerThread.start();
226         
227         spLogger.info("Statistics Provider started.");
228     }
229     
230     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
231         //Register for flow updates
232         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
233                                                                     .augmentation(FlowCapableNode.class)
234                                                                     .child(Table.class)
235                                                                     .child(Flow.class).toInstance();
236         dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
237         
238         //Register for meter updates
239         InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
240                                                     .augmentation(FlowCapableNode.class)
241                                                     .child(Meter.class).toInstance();
242
243         dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
244         
245         //Register for group updates 
246         InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
247                                                     .augmentation(FlowCapableNode.class)
248                                                     .child(Group.class).toInstance();
249         dbs.registerDataChangeListener(pathGroup, statsUpdateHandler);
250
251         //Register for queue updates
252         InstanceIdentifier<? extends DataObject> pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class)
253                                                                     .child(NodeConnector.class)
254                                                                     .augmentation(FlowCapableNodeConnector.class)
255                                                                     .child(Queue.class).toInstance();
256         dbs.registerDataChangeListener(pathQueue, statsUpdateHandler);
257     }
258
259     protected DataModificationTransaction startChange() {
260         
261         DataProviderService dps = this.getDataService();
262         return dps.beginTransaction();
263     }
264     
265     private void statsRequestSender(){
266         
267         List<Node> targetNodes = getAllConnectedNodes();
268         
269         if(targetNodes == null)
270             return;
271         
272
273         for (Node targetNode : targetNodes){
274             
275             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
276
277                 spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
278                 
279                 InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
280                 
281                 NodeRef targetNodeRef = new NodeRef(targetInstanceId);
282             
283                 try{
284                     sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
285                 
286                     sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
287
288                     sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
289                 
290                     sendAllFlowTablesStatisticsRequest(targetNodeRef);
291                 
292                     sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
293
294                     sendAllGroupStatisticsRequest(targetNodeRef);
295                     
296                     sendAllMeterStatisticsRequest(targetNodeRef);
297                     
298                     sendGroupDescriptionRequest(targetNodeRef);
299                     
300                     sendMeterConfigStatisticsRequest(targetNodeRef);
301                 }catch(Exception e){
302                     spLogger.error("Exception occured while sending statistics requests : {}", e);
303                 }
304             }
305         }
306     }
307
308     public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
309         final GetFlowTablesStatisticsInputBuilder input = 
310                 new GetFlowTablesStatisticsInputBuilder();
311         
312         input.setNode(targetNodeRef);
313
314         Future<RpcResult<GetFlowTablesStatisticsOutput>> response = 
315                 flowTableStatsService.getFlowTablesStatistics(input.build());
316
317         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
318                 , StatsRequestType.ALL_FLOW_TABLE);
319
320     }
321
322     public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
323         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
324                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
325         
326         input.setNode(targetNode);
327         
328         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = 
329                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
330         
331         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
332                 , StatsRequestType.ALL_FLOW);
333         
334     }
335     
336     public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
337         final GetFlowStatisticsFromFlowTableInputBuilder input =
338                 new GetFlowStatisticsFromFlowTableInputBuilder();
339         
340         input.setNode(targetNode);
341         input.fieldsFrom(flow);
342         
343         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response = 
344                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
345         
346         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
347                 , StatsRequestType.ALL_FLOW);
348         
349     }
350
351     public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
352         
353         List<Short> tablesId = getTablesFromNode(targetNodeKey);
354         
355         if(tablesId.size() != 0){
356             for(Short id : tablesId){
357                 
358                 spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
359                 GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
360                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
361                 
362                 input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
363                 input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id));
364                 Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
365                         flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
366                 
367                 multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
368                 this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
369                         , StatsRequestType.AGGR_FLOW);
370             }
371         }else{
372             spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
373         }
374     }
375
376     public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
377         
378         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
379         
380         input.setNode(targetNode);
381
382         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response = 
383                 portStatsService.getAllNodeConnectorsStatistics(input.build());
384         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
385                 , StatsRequestType.ALL_PORT);
386
387     }
388
389     public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
390         
391         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
392         
393         input.setNode(targetNode);
394
395         Future<RpcResult<GetAllGroupStatisticsOutput>> response = 
396                 groupStatsService.getAllGroupStatistics(input.build());
397         
398         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
399                 , StatsRequestType.ALL_GROUP);
400
401     }
402     
403     public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
404         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
405         
406         input.setNode(targetNode);
407
408         Future<RpcResult<GetGroupDescriptionOutput>> response = 
409                 groupStatsService.getGroupDescription(input.build());
410
411         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
412                 , StatsRequestType.GROUP_DESC);
413
414     }
415     
416     public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
417         
418         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
419         
420         input.setNode(targetNode);
421
422         Future<RpcResult<GetAllMeterStatisticsOutput>> response = 
423                 meterStatsService.getAllMeterStatistics(input.build());
424         
425         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
426                 , StatsRequestType.ALL_METER);;
427
428     }
429     
430     public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
431         
432         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
433         
434         input.setNode(targetNode);
435
436         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response = 
437                 meterStatsService.getAllMeterConfigStatistics(input.build());
438         
439         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
440                 , StatsRequestType.METER_CONFIG);;
441
442     }
443     
444     public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
445         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
446         
447         input.setNode(targetNode);
448         
449         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response = 
450                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
451         
452         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
453                 , StatsRequestType.ALL_QUEUE_STATS);;
454
455     }
456
457     public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
458         GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
459         
460         input.setNode(targetNode);
461         input.setNodeConnectorId(nodeConnectorId);
462         input.setQueueId(queueId);
463         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response = 
464                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
465         
466         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
467                 , StatsRequestType.ALL_QUEUE_STATS);;
468
469     }
470
471     public ConcurrentMap<NodeId, NodeStatisticsAger> getStatisticsCache() {
472         return statisticsCache;
473     }
474     
475     private List<Node> getAllConnectedNodes(){
476         
477         Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
478         if(nodes == null)
479             return null;
480         
481         spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
482         return nodes.getNode();
483     }
484     
485     private List<Short> getTablesFromNode(NodeKey nodeKey){
486         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
487         
488         FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
489         List<Short> tablesId = new ArrayList<Short>();
490         if(node != null && node.getTable()!=null){
491             spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
492             for(Table table: node.getTable()){
493                 tablesId.add(table.getId());
494             }
495         }
496         return tablesId;
497     }
498
499     @SuppressWarnings("deprecation")
500     @Override
501     public void close(){
502         
503         try {
504             spLogger.info("Statistics Provider stopped.");
505             if (this.listenerRegistration != null) {
506               
507                 this.listenerRegistration.close();
508                 
509                 this.statisticsRequesterThread.destroy();
510                 
511                 this.statisticsAgerThread.destroy();
512             
513             }
514           } catch (Throwable e) {
515             throw Exceptions.sneakyThrow(e);
516           }
517     }
518
519 }