Gerrit contains following fixes:
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Future;
16
17 import org.eclipse.xtext.xbase.lib.Exceptions;
18 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
19 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
20 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
21 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
22 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.DataObject;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.binding.NotificationListener;
70 import org.opendaylight.yangtools.yang.common.RpcResult;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 /** 
75  * Following are main responsibilities of the class:
76  * 1) Invoke statistics request thread to send periodic statistics request to all the 
77  * flow capable switch connected to the controller. It sends statistics request for 
78  * Group,Meter,Table,Flow,Queue,Aggregate stats.   
79  * 
80  * 2) Invoke statistics ager thread, to clean up all the stale statistics data from 
81  * operational data store.
82  * 
83  * @author avishnoi@in.ibm.com
84  *
85  */
86 public class StatisticsProvider implements AutoCloseable {
87
88     public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
89     
90     private DataProviderService dps;
91     
92     private DataBrokerService dbs;
93
94     private NotificationProviderService nps;
95     
96     private OpendaylightGroupStatisticsService groupStatsService;
97     
98     private OpendaylightMeterStatisticsService meterStatsService;
99     
100     private OpendaylightFlowStatisticsService flowStatsService;
101     
102     private OpendaylightPortStatisticsService portStatsService;
103
104     private OpendaylightFlowTableStatisticsService flowTableStatsService;
105
106     private OpendaylightQueueStatisticsService queueStatsService;
107
108     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
109     
110     private StatisticsUpdateHandler statsUpdateHandler;
111     
112     private Thread statisticsRequesterThread;
113     
114     private Thread statisticsAgerThread;
115
116     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
117     
118     public static final int STATS_THREAD_EXECUTION_TIME= 15000;
119     //Local caching of stats
120     
121     private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache = 
122             new ConcurrentHashMap<NodeId,NodeStatisticsAger>();
123     
124     public DataProviderService getDataService() {
125       return this.dps;
126     }
127     
128     public void setDataService(final DataProviderService dataService) {
129       this.dps = dataService;
130     }
131     
132     public DataBrokerService getDataBrokerService() {
133         return this.dbs;
134     }
135       
136     public void setDataBrokerService(final DataBrokerService dataBrokerService) {
137         this.dbs = dataBrokerService;
138     }
139
140     public NotificationProviderService getNotificationService() {
141       return this.nps;
142     }
143     
144     public void setNotificationService(final NotificationProviderService notificationService) {
145       this.nps = notificationService;
146     }
147
148     public MultipartMessageManager getMultipartMessageManager() {
149         return multipartMessageManager;
150     }
151
152     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
153     
154     private Registration<NotificationListener> listenerRegistration;
155     
156     public void start() {
157         
158         NotificationProviderService nps = this.getNotificationService();
159         Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
160         this.listenerRegistration = registerNotificationListener;
161         
162         statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
163         
164         registerDataStoreUpdateListener(this.getDataBrokerService());
165         
166         // Get Group/Meter statistics service instance
167         groupStatsService = StatisticsManagerActivator.getProviderContext().
168                 getRpcService(OpendaylightGroupStatisticsService.class);
169         
170         meterStatsService = StatisticsManagerActivator.getProviderContext().
171                 getRpcService(OpendaylightMeterStatisticsService.class);
172         
173         flowStatsService = StatisticsManagerActivator.getProviderContext().
174                 getRpcService(OpendaylightFlowStatisticsService.class);
175
176         portStatsService = StatisticsManagerActivator.getProviderContext().
177                 getRpcService(OpendaylightPortStatisticsService.class);
178
179         flowTableStatsService = StatisticsManagerActivator.getProviderContext().
180                 getRpcService(OpendaylightFlowTableStatisticsService.class);
181         
182         queueStatsService = StatisticsManagerActivator.getProviderContext().
183                 getRpcService(OpendaylightQueueStatisticsService.class);
184         
185         statisticsRequesterThread = new Thread( new Runnable(){
186
187             @Override
188             public void run() {
189                 while(true){
190                     try {
191                         statsRequestSender();
192                         
193                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
194                     }catch (Exception e){
195                         spLogger.error("Exception occurred while sending stats request : {}",e);
196                     }
197                 }
198             }
199         });
200         
201         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
202         
203         statisticsRequesterThread.start();
204         
205         statisticsAgerThread = new Thread( new Runnable(){
206
207             @Override
208             public void run() {
209                 while(true){
210                     try {
211                         for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
212                             nodeStatisticsAger.cleanStaleStatistics();
213                         }
214                         multipartMessageManager.cleanStaleTransactionIds();
215                         
216                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
217                     }catch (Exception e){
218                         spLogger.error("Exception occurred while sending stats request : {}",e);
219                     }
220                 }
221             }
222         });
223         
224         spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
225
226         statisticsAgerThread.start();
227         
228         spLogger.info("Statistics Provider started.");
229     }
230     
231     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
232         //Register for Node updates
233         InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
234                                                                         .child(Node.class).toInstance();
235         dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
236
237         //Register for flow updates
238         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
239                                                                     .augmentation(FlowCapableNode.class)
240                                                                     .child(Table.class)
241                                                                     .child(Flow.class).toInstance();
242         dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
243         
244         //Register for meter updates
245         InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
246                                                     .augmentation(FlowCapableNode.class)
247                                                     .child(Meter.class).toInstance();
248
249         dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
250         
251         //Register for group updates 
252         InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
253                                                     .augmentation(FlowCapableNode.class)
254                                                     .child(Group.class).toInstance();
255         dbs.registerDataChangeListener(pathGroup, statsUpdateHandler);
256
257         //Register for queue updates
258         InstanceIdentifier<? extends DataObject> pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class)
259                                                                     .child(NodeConnector.class)
260                                                                     .augmentation(FlowCapableNodeConnector.class)
261                                                                     .child(Queue.class).toInstance();
262         dbs.registerDataChangeListener(pathQueue, statsUpdateHandler);
263     }
264
265     protected DataModificationTransaction startChange() {
266         
267         DataProviderService dps = this.getDataService();
268         return dps.beginTransaction();
269     }
270     
271     private void statsRequestSender(){
272         
273         List<Node> targetNodes = getAllConnectedNodes();
274         
275         if(targetNodes == null)
276             return;
277         
278
279         for (Node targetNode : targetNodes){
280             
281             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
282                 sendStatisticsRequestsToNode(targetNode);
283             }
284         }
285     }
286     
287     public void sendStatisticsRequestsToNode(Node targetNode){
288         
289         spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
290         
291         InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
292         
293         NodeRef targetNodeRef = new NodeRef(targetInstanceId);
294     
295         try{
296             if(flowStatsService != null){
297                 sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
298                 sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
299             }
300             if(flowTableStatsService != null){
301                 sendAllFlowTablesStatisticsRequest(targetNodeRef);
302             }
303             if(portStatsService != null){
304                 sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
305             }
306             if(groupStatsService != null){
307                 sendAllGroupStatisticsRequest(targetNodeRef);
308                 sendGroupDescriptionRequest(targetNodeRef);
309             }
310             if(meterStatsService != null){
311                 sendAllMeterStatisticsRequest(targetNodeRef);
312                 sendMeterConfigStatisticsRequest(targetNodeRef);
313             }
314             if(queueStatsService != null){
315                 sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
316             }
317         }catch(Exception e){
318             spLogger.error("Exception occured while sending statistics requests : {}", e);
319         }
320     }
321     
322
323     public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
324         final GetFlowTablesStatisticsInputBuilder input = 
325                 new GetFlowTablesStatisticsInputBuilder();
326         
327         input.setNode(targetNodeRef);
328
329         Future<RpcResult<GetFlowTablesStatisticsOutput>> response = 
330                 flowTableStatsService.getFlowTablesStatistics(input.build());
331
332         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
333                 , StatsRequestType.ALL_FLOW_TABLE);
334
335     }
336
337     public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
338         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
339                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
340         
341         input.setNode(targetNode);
342         
343         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = 
344                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
345         
346         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
347                 , StatsRequestType.ALL_FLOW);
348         
349     }
350     
351     public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
352         final GetFlowStatisticsFromFlowTableInputBuilder input =
353                 new GetFlowStatisticsFromFlowTableInputBuilder();
354         
355         input.setNode(targetNode);
356         input.fieldsFrom(flow);
357         
358         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response = 
359                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
360         
361         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
362                 , StatsRequestType.ALL_FLOW);
363         
364     }
365
366     public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
367         
368         List<Short> tablesId = getTablesFromNode(targetNodeKey);
369         
370         if(tablesId.size() != 0){
371             for(Short id : tablesId){
372                 
373                 sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
374             }
375         }else{
376             spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
377         }
378     }
379     
380     public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
381         
382         spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
383         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
384                 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
385                 
386         input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
387         input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
388         Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
389                 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
390                 
391         multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
392         this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
393                 , StatsRequestType.AGGR_FLOW);
394     }
395
396     public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
397         
398         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
399         
400         input.setNode(targetNode);
401
402         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response = 
403                 portStatsService.getAllNodeConnectorsStatistics(input.build());
404         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
405                 , StatsRequestType.ALL_PORT);
406
407     }
408
409     public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
410         
411         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
412         
413         input.setNode(targetNode);
414
415         Future<RpcResult<GetAllGroupStatisticsOutput>> response = 
416                 groupStatsService.getAllGroupStatistics(input.build());
417         
418         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
419                 , StatsRequestType.ALL_GROUP);
420
421     }
422     
423     public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
424         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
425         
426         input.setNode(targetNode);
427
428         Future<RpcResult<GetGroupDescriptionOutput>> response = 
429                 groupStatsService.getGroupDescription(input.build());
430
431         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
432                 , StatsRequestType.GROUP_DESC);
433
434     }
435     
436     public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
437         
438         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
439         
440         input.setNode(targetNode);
441
442         Future<RpcResult<GetAllMeterStatisticsOutput>> response = 
443                 meterStatsService.getAllMeterStatistics(input.build());
444         
445         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
446                 , StatsRequestType.ALL_METER);;
447
448     }
449     
450     public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
451         
452         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
453         
454         input.setNode(targetNode);
455
456         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response = 
457                 meterStatsService.getAllMeterConfigStatistics(input.build());
458         
459         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
460                 , StatsRequestType.METER_CONFIG);;
461
462     }
463     
464     public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
465         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
466         
467         input.setNode(targetNode);
468         
469         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response = 
470                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
471         
472         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
473                 , StatsRequestType.ALL_QUEUE_STATS);;
474
475     }
476
477     public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
478         GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
479         
480         input.setNode(targetNode);
481         input.setNodeConnectorId(nodeConnectorId);
482         input.setQueueId(queueId);
483         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response = 
484                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
485         
486         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
487                 , StatsRequestType.ALL_QUEUE_STATS);;
488
489     }
490
491     public ConcurrentMap<NodeId, NodeStatisticsAger> getStatisticsCache() {
492         return statisticsCache;
493     }
494     
495     private List<Node> getAllConnectedNodes(){
496         
497         Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
498         if(nodes == null)
499             return null;
500         
501         spLogger.debug("Number of connected nodes : {}",nodes.getNode().size());
502         return nodes.getNode();
503     }
504     
505     private List<Short> getTablesFromNode(NodeKey nodeKey){
506         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
507         
508         FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
509         List<Short> tablesId = new ArrayList<Short>();
510         if(node != null && node.getTable()!=null){
511             spLogger.debug("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
512             for(Table table: node.getTable()){
513                 tablesId.add(table.getId());
514             }
515         }
516         return tablesId;
517     }
518
519     @SuppressWarnings("unchecked")
520     private NodeId getNodeId(NodeRef nodeRef){
521         InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
522         NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
523         return nodeKey.getId();
524     }
525     
526     @SuppressWarnings("deprecation")
527     @Override
528     public void close(){
529         
530         try {
531             spLogger.info("Statistics Provider stopped.");
532             if (this.listenerRegistration != null) {
533               
534                 this.listenerRegistration.close();
535                 
536                 this.statisticsRequesterThread.destroy();
537                 
538                 this.statisticsAgerThread.destroy();
539             
540             }
541           } catch (Throwable e) {
542             throw Exceptions.sneakyThrow(e);
543           }
544     }
545
546 }