Centralize NodeStatisticsAger creation
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Future;
16
17 import org.eclipse.xtext.xbase.lib.Exceptions;
18 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
19 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
20 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
21 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
22 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.DataObject;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.binding.NotificationListener;
70 import org.opendaylight.yangtools.yang.common.RpcResult;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 /**
75  * Following are main responsibilities of the class:
76  * 1) Invoke statistics request thread to send periodic statistics request to all the
77  * flow capable switch connected to the controller. It sends statistics request for
78  * Group,Meter,Table,Flow,Queue,Aggregate stats.
79  *
80  * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
81  * operational data store.
82  *
83  * @author avishnoi@in.ibm.com
84  *
85  */
86 public class StatisticsProvider implements AutoCloseable {
87
88     public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
89
90     private DataProviderService dps;
91
92     private DataBrokerService dbs;
93
94     private NotificationProviderService nps;
95
96     private OpendaylightGroupStatisticsService groupStatsService;
97
98     private OpendaylightMeterStatisticsService meterStatsService;
99
100     private OpendaylightFlowStatisticsService flowStatsService;
101
102     private OpendaylightPortStatisticsService portStatsService;
103
104     private OpendaylightFlowTableStatisticsService flowTableStatsService;
105
106     private OpendaylightQueueStatisticsService queueStatsService;
107
108     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
109
110     private StatisticsUpdateHandler statsUpdateHandler;
111
112     private Thread statisticsRequesterThread;
113
114     private Thread statisticsAgerThread;
115
116     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
117
118     public static final int STATS_THREAD_EXECUTION_TIME= 15000;
119     //Local caching of stats
120
121     private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache =
122             new ConcurrentHashMap<NodeId,NodeStatisticsAger>();
123
124     public DataProviderService getDataService() {
125       return this.dps;
126     }
127
128     public void setDataService(final DataProviderService dataService) {
129       this.dps = dataService;
130     }
131
132     public DataBrokerService getDataBrokerService() {
133         return this.dbs;
134     }
135
136     public void setDataBrokerService(final DataBrokerService dataBrokerService) {
137         this.dbs = dataBrokerService;
138     }
139
140     public NotificationProviderService getNotificationService() {
141       return this.nps;
142     }
143
144     public void setNotificationService(final NotificationProviderService notificationService) {
145       this.nps = notificationService;
146     }
147
148     public MultipartMessageManager getMultipartMessageManager() {
149         return multipartMessageManager;
150     }
151
152     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
153
154     private Registration<NotificationListener> listenerRegistration;
155
156     public void start() {
157
158         NotificationProviderService nps = this.getNotificationService();
159         Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
160         this.listenerRegistration = registerNotificationListener;
161
162         statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
163
164         registerDataStoreUpdateListener(this.getDataBrokerService());
165
166         // Get Group/Meter statistics service instance
167         groupStatsService = StatisticsManagerActivator.getProviderContext().
168                 getRpcService(OpendaylightGroupStatisticsService.class);
169
170         meterStatsService = StatisticsManagerActivator.getProviderContext().
171                 getRpcService(OpendaylightMeterStatisticsService.class);
172
173         flowStatsService = StatisticsManagerActivator.getProviderContext().
174                 getRpcService(OpendaylightFlowStatisticsService.class);
175
176         portStatsService = StatisticsManagerActivator.getProviderContext().
177                 getRpcService(OpendaylightPortStatisticsService.class);
178
179         flowTableStatsService = StatisticsManagerActivator.getProviderContext().
180                 getRpcService(OpendaylightFlowTableStatisticsService.class);
181
182         queueStatsService = StatisticsManagerActivator.getProviderContext().
183                 getRpcService(OpendaylightQueueStatisticsService.class);
184
185         statisticsRequesterThread = new Thread( new Runnable(){
186
187             @Override
188             public void run() {
189                 while(true){
190                     try {
191                         statsRequestSender();
192
193                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
194                     }catch (Exception e){
195                         spLogger.error("Exception occurred while sending stats request : {}",e);
196                     }
197                 }
198             }
199         });
200
201         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
202
203         statisticsRequesterThread.start();
204
205         statisticsAgerThread = new Thread( new Runnable(){
206
207             @Override
208             public void run() {
209                 while(true){
210                     try {
211                         for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
212                             nodeStatisticsAger.cleanStaleStatistics();
213                         }
214                         multipartMessageManager.cleanStaleTransactionIds();
215
216                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
217                     }catch (Exception e){
218                         spLogger.error("Exception occurred while sending stats request : {}",e);
219                     }
220                 }
221             }
222         });
223
224         spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
225
226         statisticsAgerThread.start();
227
228         spLogger.info("Statistics Provider started.");
229     }
230
231     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
232         //Register for Node updates
233         InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
234                                                                         .child(Node.class).toInstance();
235         dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
236
237         //Register for flow updates
238         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
239                                                                     .augmentation(FlowCapableNode.class)
240                                                                     .child(Table.class)
241                                                                     .child(Flow.class).toInstance();
242         dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
243
244         //Register for meter updates
245         InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
246                                                     .augmentation(FlowCapableNode.class)
247                                                     .child(Meter.class).toInstance();
248
249         dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
250
251         //Register for group updates
252         InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
253                                                     .augmentation(FlowCapableNode.class)
254                                                     .child(Group.class).toInstance();
255         dbs.registerDataChangeListener(pathGroup, statsUpdateHandler);
256
257         //Register for queue updates
258         InstanceIdentifier<? extends DataObject> pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class)
259                                                                     .child(NodeConnector.class)
260                                                                     .augmentation(FlowCapableNodeConnector.class)
261                                                                     .child(Queue.class).toInstance();
262         dbs.registerDataChangeListener(pathQueue, statsUpdateHandler);
263     }
264
265     protected DataModificationTransaction startChange() {
266
267         DataProviderService dps = this.getDataService();
268         return dps.beginTransaction();
269     }
270
271     private void statsRequestSender(){
272
273         List<Node> targetNodes = getAllConnectedNodes();
274
275         if(targetNodes == null)
276             return;
277
278
279         for (Node targetNode : targetNodes){
280
281             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
282                 sendStatisticsRequestsToNode(targetNode);
283             }
284         }
285     }
286
287     public void sendStatisticsRequestsToNode(Node targetNode){
288
289         spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
290
291         InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
292
293         NodeRef targetNodeRef = new NodeRef(targetInstanceId);
294
295         try{
296             if(flowStatsService != null){
297                 sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
298                 sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
299             }
300             if(flowTableStatsService != null){
301                 sendAllFlowTablesStatisticsRequest(targetNodeRef);
302             }
303             if(portStatsService != null){
304                 sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
305             }
306             if(groupStatsService != null){
307                 sendAllGroupStatisticsRequest(targetNodeRef);
308                 sendGroupDescriptionRequest(targetNodeRef);
309             }
310             if(meterStatsService != null){
311                 sendAllMeterStatisticsRequest(targetNodeRef);
312                 sendMeterConfigStatisticsRequest(targetNodeRef);
313             }
314             if(queueStatsService != null){
315                 sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
316             }
317         }catch(Exception e){
318             spLogger.error("Exception occured while sending statistics requests : {}", e);
319         }
320     }
321
322
323     public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
324         final GetFlowTablesStatisticsInputBuilder input =
325                 new GetFlowTablesStatisticsInputBuilder();
326
327         input.setNode(targetNodeRef);
328
329         Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
330                 flowTableStatsService.getFlowTablesStatistics(input.build());
331
332         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
333                 , StatsRequestType.ALL_FLOW_TABLE);
334
335     }
336
337     public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
338         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
339                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
340
341         input.setNode(targetNode);
342
343         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
344                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
345
346         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
347                 , StatsRequestType.ALL_FLOW);
348
349     }
350
351     public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
352         final GetFlowStatisticsFromFlowTableInputBuilder input =
353                 new GetFlowStatisticsFromFlowTableInputBuilder();
354
355         input.setNode(targetNode);
356         input.fieldsFrom(flow);
357
358         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
359                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
360
361         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
362                 , StatsRequestType.ALL_FLOW);
363
364     }
365
366     public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
367
368         List<Short> tablesId = getTablesFromNode(targetNodeKey);
369
370         if(tablesId.size() != 0){
371             for(Short id : tablesId){
372
373                 sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
374             }
375         }else{
376             spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
377         }
378     }
379
380     public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
381
382         spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
383         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
384                 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
385
386         input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
387         input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
388         Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
389                 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
390
391         multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
392         this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
393                 , StatsRequestType.AGGR_FLOW);
394     }
395
396     public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
397
398         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
399
400         input.setNode(targetNode);
401
402         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
403                 portStatsService.getAllNodeConnectorsStatistics(input.build());
404         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
405                 , StatsRequestType.ALL_PORT);
406
407     }
408
409     public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
410
411         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
412
413         input.setNode(targetNode);
414
415         Future<RpcResult<GetAllGroupStatisticsOutput>> response =
416                 groupStatsService.getAllGroupStatistics(input.build());
417
418         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
419                 , StatsRequestType.ALL_GROUP);
420
421     }
422
423     public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
424         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
425
426         input.setNode(targetNode);
427
428         Future<RpcResult<GetGroupDescriptionOutput>> response =
429                 groupStatsService.getGroupDescription(input.build());
430
431         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
432                 , StatsRequestType.GROUP_DESC);
433
434     }
435
436     public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
437
438         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
439
440         input.setNode(targetNode);
441
442         Future<RpcResult<GetAllMeterStatisticsOutput>> response =
443                 meterStatsService.getAllMeterStatistics(input.build());
444
445         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
446                 , StatsRequestType.ALL_METER);;
447
448     }
449
450     public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
451
452         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
453
454         input.setNode(targetNode);
455
456         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
457                 meterStatsService.getAllMeterConfigStatistics(input.build());
458
459         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
460                 , StatsRequestType.METER_CONFIG);;
461
462     }
463
464     public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
465         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
466
467         input.setNode(targetNode);
468
469         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
470                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
471
472         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
473                 , StatsRequestType.ALL_QUEUE_STATS);;
474
475     }
476
477     public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
478         GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
479
480         input.setNode(targetNode);
481         input.setNodeConnectorId(nodeConnectorId);
482         input.setQueueId(queueId);
483         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
484                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
485
486         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
487                 , StatsRequestType.ALL_QUEUE_STATS);;
488
489     }
490
491     public final NodeStatisticsAger getStatisticsAger(final NodeId nodeId) {
492         NodeStatisticsAger ager = statisticsCache.get(nodeId);
493         if (ager == null) {
494             ager = new NodeStatisticsAger(this, new NodeKey(nodeId));
495             statisticsCache.put(nodeId, ager);
496         }
497
498         return ager;
499     }
500
501     private List<Node> getAllConnectedNodes(){
502         Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
503         if(nodes == null)
504             return null;
505
506         spLogger.debug("Number of connected nodes : {}",nodes.getNode().size());
507         return nodes.getNode();
508     }
509
510     private List<Short> getTablesFromNode(NodeKey nodeKey){
511         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
512
513         FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
514         List<Short> tablesId = new ArrayList<Short>();
515         if(node != null && node.getTable()!=null){
516             spLogger.debug("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
517             for(Table table: node.getTable()){
518                 tablesId.add(table.getId());
519             }
520         }
521         return tablesId;
522     }
523
524     @SuppressWarnings("unchecked")
525     private NodeId getNodeId(NodeRef nodeRef){
526         InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
527         NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
528         return nodeKey.getId();
529     }
530
531     @SuppressWarnings("deprecation")
532     @Override
533     public void close(){
534
535         try {
536             spLogger.info("Statistics Provider stopped.");
537             if (this.listenerRegistration != null) {
538
539                 this.listenerRegistration.close();
540
541                 this.statisticsRequesterThread.destroy();
542
543                 this.statisticsAgerThread.destroy();
544
545             }
546           } catch (Throwable e) {
547             throw Exceptions.sneakyThrow(e);
548           }
549     }
550
551 }