Gerrit contains following minor enhancements:
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Future;
16
17 import org.eclipse.xtext.xbase.lib.Exceptions;
18 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
19 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
20 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
21 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
22 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.DataObject;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.binding.NotificationListener;
70 import org.opendaylight.yangtools.yang.common.RpcResult;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 /** 
75  * Following are main responsibilities of the class:
76  * 1) Invoke statistics request thread to send periodic statistics request to all the 
77  * flow capable switch connected to the controller. It sends statistics request for 
78  * Group,Meter,Table,Flow,Queue,Aggregate stats.   
79  * 
80  * 2) Invoke statistics ager thread, to clean up all the stale statistics data from 
81  * operational data store.
82  * 
83  * @author avishnoi@in.ibm.com
84  *
85  */
86 public class StatisticsProvider implements AutoCloseable {
87
88     public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
89     
90     private DataProviderService dps;
91     
92     private DataBrokerService dbs;
93
94     private NotificationProviderService nps;
95     
96     private OpendaylightGroupStatisticsService groupStatsService;
97     
98     private OpendaylightMeterStatisticsService meterStatsService;
99     
100     private OpendaylightFlowStatisticsService flowStatsService;
101     
102     private OpendaylightPortStatisticsService portStatsService;
103
104     private OpendaylightFlowTableStatisticsService flowTableStatsService;
105
106     private OpendaylightQueueStatisticsService queueStatsService;
107
108     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
109     
110     private StatisticsUpdateHandler statsUpdateHandler;
111     
112     private Thread statisticsRequesterThread;
113     
114     private Thread statisticsAgerThread;
115
116     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
117     
118     public static final int STATS_THREAD_EXECUTION_TIME= 15000;
119     //Local caching of stats
120     
121     private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache = 
122             new ConcurrentHashMap<NodeId,NodeStatisticsAger>();
123     
124     public DataProviderService getDataService() {
125       return this.dps;
126     }
127     
128     public void setDataService(final DataProviderService dataService) {
129       this.dps = dataService;
130     }
131     
132     public DataBrokerService getDataBrokerService() {
133         return this.dbs;
134     }
135       
136     public void setDataBrokerService(final DataBrokerService dataBrokerService) {
137         this.dbs = dataBrokerService;
138     }
139
140     public NotificationProviderService getNotificationService() {
141       return this.nps;
142     }
143     
144     public void setNotificationService(final NotificationProviderService notificationService) {
145       this.nps = notificationService;
146     }
147
148     public MultipartMessageManager getMultipartMessageManager() {
149         return multipartMessageManager;
150     }
151
152     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
153     
154     private Registration<NotificationListener> listenerRegistration;
155     
156     public void start() {
157         
158         NotificationProviderService nps = this.getNotificationService();
159         Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
160         this.listenerRegistration = registerNotificationListener;
161         
162         statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
163         
164         registerDataStoreUpdateListener(this.getDataBrokerService());
165         
166         // Get Group/Meter statistics service instance
167         groupStatsService = StatisticsManagerActivator.getProviderContext().
168                 getRpcService(OpendaylightGroupStatisticsService.class);
169         
170         meterStatsService = StatisticsManagerActivator.getProviderContext().
171                 getRpcService(OpendaylightMeterStatisticsService.class);
172         
173         flowStatsService = StatisticsManagerActivator.getProviderContext().
174                 getRpcService(OpendaylightFlowStatisticsService.class);
175
176         portStatsService = StatisticsManagerActivator.getProviderContext().
177                 getRpcService(OpendaylightPortStatisticsService.class);
178
179         flowTableStatsService = StatisticsManagerActivator.getProviderContext().
180                 getRpcService(OpendaylightFlowTableStatisticsService.class);
181         
182         queueStatsService = StatisticsManagerActivator.getProviderContext().
183                 getRpcService(OpendaylightQueueStatisticsService.class);
184         
185         statisticsRequesterThread = new Thread( new Runnable(){
186
187             @Override
188             public void run() {
189                 while(true){
190                     try {
191                         statsRequestSender();
192                         
193                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
194                     }catch (Exception e){
195                         spLogger.error("Exception occurred while sending stats request : {}",e);
196                     }
197                 }
198             }
199         });
200         
201         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
202         
203         statisticsRequesterThread.start();
204         
205         statisticsAgerThread = new Thread( new Runnable(){
206
207             @Override
208             public void run() {
209                 while(true){
210                     try {
211                         for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
212                             nodeStatisticsAger.cleanStaleStatistics();
213                         }
214                         multipartMessageManager.cleanStaleTransactionIds();
215                         
216                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
217                     }catch (Exception e){
218                         spLogger.error("Exception occurred while sending stats request : {}",e);
219                     }
220                 }
221             }
222         });
223         
224         spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
225
226         statisticsAgerThread.start();
227         
228         spLogger.info("Statistics Provider started.");
229     }
230     
231     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
232         //Register for Node updates
233         InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
234                                                                         .child(Node.class).toInstance();
235         dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
236
237         //Register for flow updates
238         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
239                                                                     .augmentation(FlowCapableNode.class)
240                                                                     .child(Table.class)
241                                                                     .child(Flow.class).toInstance();
242         dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
243         
244         //Register for meter updates
245         InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
246                                                     .augmentation(FlowCapableNode.class)
247                                                     .child(Meter.class).toInstance();
248
249         dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
250         
251         //Register for group updates 
252         InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
253                                                     .augmentation(FlowCapableNode.class)
254                                                     .child(Group.class).toInstance();
255         dbs.registerDataChangeListener(pathGroup, statsUpdateHandler);
256
257         //Register for queue updates
258         InstanceIdentifier<? extends DataObject> pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class)
259                                                                     .child(NodeConnector.class)
260                                                                     .augmentation(FlowCapableNodeConnector.class)
261                                                                     .child(Queue.class).toInstance();
262         dbs.registerDataChangeListener(pathQueue, statsUpdateHandler);
263     }
264
265     protected DataModificationTransaction startChange() {
266         
267         DataProviderService dps = this.getDataService();
268         return dps.beginTransaction();
269     }
270     
271     private void statsRequestSender(){
272         
273         List<Node> targetNodes = getAllConnectedNodes();
274         
275         if(targetNodes == null)
276             return;
277         
278
279         for (Node targetNode : targetNodes){
280             
281             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
282                 sendStatisticsRequestsToNode(targetNode);
283             }
284         }
285     }
286     
287     public void sendStatisticsRequestsToNode(Node targetNode){
288         
289         spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
290         
291         InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
292         
293         NodeRef targetNodeRef = new NodeRef(targetInstanceId);
294     
295         try{
296             sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
297         
298             sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
299
300             sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
301         
302             sendAllFlowTablesStatisticsRequest(targetNodeRef);
303         
304             sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
305
306             sendAllGroupStatisticsRequest(targetNodeRef);
307             
308             sendAllMeterStatisticsRequest(targetNodeRef);
309             
310             sendGroupDescriptionRequest(targetNodeRef);
311             
312             sendMeterConfigStatisticsRequest(targetNodeRef);
313         }catch(Exception e){
314             spLogger.error("Exception occured while sending statistics requests : {}", e);
315         }
316     }
317     
318
319     public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
320         final GetFlowTablesStatisticsInputBuilder input = 
321                 new GetFlowTablesStatisticsInputBuilder();
322         
323         input.setNode(targetNodeRef);
324
325         Future<RpcResult<GetFlowTablesStatisticsOutput>> response = 
326                 flowTableStatsService.getFlowTablesStatistics(input.build());
327
328         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
329                 , StatsRequestType.ALL_FLOW_TABLE);
330
331     }
332
333     public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
334         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
335                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
336         
337         input.setNode(targetNode);
338         
339         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = 
340                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
341         
342         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
343                 , StatsRequestType.ALL_FLOW);
344         
345     }
346     
347     public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
348         final GetFlowStatisticsFromFlowTableInputBuilder input =
349                 new GetFlowStatisticsFromFlowTableInputBuilder();
350         
351         input.setNode(targetNode);
352         input.fieldsFrom(flow);
353         
354         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response = 
355                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
356         
357         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
358                 , StatsRequestType.ALL_FLOW);
359         
360     }
361
362     public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
363         
364         List<Short> tablesId = getTablesFromNode(targetNodeKey);
365         
366         if(tablesId.size() != 0){
367             for(Short id : tablesId){
368                 
369                 spLogger.debug("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
370                 GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
371                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
372                 
373                 input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
374                 input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id));
375                 Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
376                         flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
377                 
378                 multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), id);
379                 this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
380                         , StatsRequestType.AGGR_FLOW);
381             }
382         }else{
383             spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
384         }
385     }
386
387     public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
388         
389         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
390         
391         input.setNode(targetNode);
392
393         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response = 
394                 portStatsService.getAllNodeConnectorsStatistics(input.build());
395         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
396                 , StatsRequestType.ALL_PORT);
397
398     }
399
400     public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
401         
402         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
403         
404         input.setNode(targetNode);
405
406         Future<RpcResult<GetAllGroupStatisticsOutput>> response = 
407                 groupStatsService.getAllGroupStatistics(input.build());
408         
409         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
410                 , StatsRequestType.ALL_GROUP);
411
412     }
413     
414     public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
415         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
416         
417         input.setNode(targetNode);
418
419         Future<RpcResult<GetGroupDescriptionOutput>> response = 
420                 groupStatsService.getGroupDescription(input.build());
421
422         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
423                 , StatsRequestType.GROUP_DESC);
424
425     }
426     
427     public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
428         
429         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
430         
431         input.setNode(targetNode);
432
433         Future<RpcResult<GetAllMeterStatisticsOutput>> response = 
434                 meterStatsService.getAllMeterStatistics(input.build());
435         
436         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
437                 , StatsRequestType.ALL_METER);;
438
439     }
440     
441     public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
442         
443         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
444         
445         input.setNode(targetNode);
446
447         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response = 
448                 meterStatsService.getAllMeterConfigStatistics(input.build());
449         
450         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
451                 , StatsRequestType.METER_CONFIG);;
452
453     }
454     
455     public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
456         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
457         
458         input.setNode(targetNode);
459         
460         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response = 
461                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
462         
463         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
464                 , StatsRequestType.ALL_QUEUE_STATS);;
465
466     }
467
468     public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
469         GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
470         
471         input.setNode(targetNode);
472         input.setNodeConnectorId(nodeConnectorId);
473         input.setQueueId(queueId);
474         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response = 
475                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
476         
477         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
478                 , StatsRequestType.ALL_QUEUE_STATS);;
479
480     }
481
482     public ConcurrentMap<NodeId, NodeStatisticsAger> getStatisticsCache() {
483         return statisticsCache;
484     }
485     
486     private List<Node> getAllConnectedNodes(){
487         
488         Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
489         if(nodes == null)
490             return null;
491         
492         spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
493         return nodes.getNode();
494     }
495     
496     private List<Short> getTablesFromNode(NodeKey nodeKey){
497         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
498         
499         FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
500         List<Short> tablesId = new ArrayList<Short>();
501         if(node != null && node.getTable()!=null){
502             spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
503             for(Table table: node.getTable()){
504                 tablesId.add(table.getId());
505             }
506         }
507         return tablesId;
508     }
509
510     @SuppressWarnings("unchecked")
511     private NodeId getNodeId(NodeRef nodeRef){
512         InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
513         NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
514         return nodeKey.getId();
515     }
516     
517     @SuppressWarnings("deprecation")
518     @Override
519     public void close(){
520         
521         try {
522             spLogger.info("Statistics Provider stopped.");
523             if (this.listenerRegistration != null) {
524               
525                 this.listenerRegistration.close();
526                 
527                 this.statisticsRequesterThread.destroy();
528                 
529                 this.statisticsAgerThread.destroy();
530             
531             }
532           } catch (Throwable e) {
533             throw Exceptions.sneakyThrow(e);
534           }
535     }
536
537 }