4851b441c36d6ab4b71b8ea74ba1dc94443088d3
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.List;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.ConcurrentMap;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.Future;
17
18 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
19 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
20 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
21 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
22 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
23 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
24 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
68 import org.opendaylight.yangtools.concepts.ListenerRegistration;
69 import org.opendaylight.yangtools.concepts.Registration;
70 import org.opendaylight.yangtools.yang.binding.DataObject;
71 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
72 import org.opendaylight.yangtools.yang.binding.NotificationListener;
73 import org.opendaylight.yangtools.yang.common.RpcResult;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 import com.google.common.base.Preconditions;
78
79 /**
80  * Following are main responsibilities of the class:
81  * 1) Invoke statistics request thread to send periodic statistics request to all the
82  * flow capable switch connected to the controller. It sends statistics request for
83  * Group,Meter,Table,Flow,Queue,Aggregate stats.
84  *
85  * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
86  * operational data store.
87  *
88  * @author avishnoi@in.ibm.com
89  *
90  */
91 public class StatisticsProvider implements AutoCloseable {
92     public static final int STATS_THREAD_EXECUTION_TIME= 15000;
93
94     private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
95
96     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
97     private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
98     private final DataProviderService dps;
99
100     //Local caching of stats
101     private final ConcurrentMap<NodeId,NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
102
103     private OpendaylightGroupStatisticsService groupStatsService;
104
105     private OpendaylightMeterStatisticsService meterStatsService;
106
107     private OpendaylightFlowStatisticsService flowStatsService;
108
109     private OpendaylightPortStatisticsService portStatsService;
110
111     private OpendaylightFlowTableStatisticsService flowTableStatsService;
112
113     private OpendaylightQueueStatisticsService queueStatsService;
114
115     private StatisticsUpdateHandler statsUpdateHandler;
116
117     private Thread statisticsRequesterThread;
118
119     private Thread statisticsAgerThread;
120
121
122     public StatisticsProvider(final DataProviderService dataService) {
123         this.dps = Preconditions.checkNotNull(dataService);
124     }
125
126     public MultipartMessageManager getMultipartMessageManager() {
127         return multipartMessageManager;
128     }
129
130     private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
131
132     private Registration<NotificationListener> listenerRegistration;
133
134     private ListenerRegistration<DataChangeListener> flowCapableTrackerRegistration;
135
136     public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
137
138         // Get Group/Meter statistics service instances
139         groupStatsService = rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class);
140         meterStatsService = rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class);
141         flowStatsService = rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class);
142         portStatsService = rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class);
143         flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
144         queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
145
146         // Start receiving notifications
147         this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
148
149         // Register for switch connect/disconnect notifications
150         final InstanceIdentifier<FlowCapableNode> fcnId = InstanceIdentifier.builder(Nodes.class)
151                 .child(Node.class).augmentation(FlowCapableNode.class).build();
152         this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId,
153                 new FlowCapableTracker(this, fcnId));
154
155         statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
156         registerDataStoreUpdateListener(dbs);
157
158         statisticsRequesterThread = new Thread( new Runnable(){
159
160             @Override
161             public void run() {
162                 while(true){
163                     try {
164                         statsRequestSender();
165
166                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
167                     }catch (Exception e){
168                         spLogger.error("Exception occurred while sending stats request : {}",e);
169                     }
170                 }
171             }
172         });
173
174         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
175
176         statisticsRequesterThread.start();
177
178         statisticsAgerThread = new Thread( new Runnable(){
179
180             @Override
181             public void run() {
182                 while(true){
183                     try {
184                         for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){
185                             nodeStatisticsAger.cleanStaleStatistics();
186                         }
187                         multipartMessageManager.cleanStaleTransactionIds();
188
189                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
190                     }catch (Exception e){
191                         spLogger.error("Exception occurred while sending stats request : {}",e);
192                     }
193                 }
194             }
195         });
196
197         spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
198
199         statisticsAgerThread.start();
200
201         spLogger.info("Statistics Provider started.");
202     }
203
204     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
205         // FIXME: the below should be broken out into StatisticsUpdateHandler
206
207         //Register for flow updates
208         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
209                                                                     .augmentation(FlowCapableNode.class)
210                                                                     .child(Table.class)
211                                                                     .child(Flow.class).toInstance();
212         dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
213
214         //Register for meter updates
215         InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
216                                                     .augmentation(FlowCapableNode.class)
217                                                     .child(Meter.class).toInstance();
218
219         dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
220
221         //Register for group updates
222         InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
223                                                     .augmentation(FlowCapableNode.class)
224                                                     .child(Group.class).toInstance();
225         dbs.registerDataChangeListener(pathGroup, statsUpdateHandler);
226
227         //Register for queue updates
228         InstanceIdentifier<? extends DataObject> pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class)
229                                                                     .child(NodeConnector.class)
230                                                                     .augmentation(FlowCapableNodeConnector.class)
231                                                                     .child(Queue.class).toInstance();
232         dbs.registerDataChangeListener(pathQueue, statsUpdateHandler);
233     }
234
235     protected DataModificationTransaction startChange() {
236         return dps.beginTransaction();
237     }
238
239     private void statsRequestSender(){
240
241         List<Node> targetNodes = getAllConnectedNodes();
242
243         if(targetNodes == null)
244             return;
245
246
247         for (Node targetNode : targetNodes){
248
249             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
250                 sendStatisticsRequestsToNode(targetNode.getKey());
251             }
252         }
253     }
254
255     private void sendStatisticsRequestsToNode(NodeKey targetNode){
256
257         spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
258
259         InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode).toInstance();
260
261         NodeRef targetNodeRef = new NodeRef(targetInstanceId);
262
263         try{
264             if(flowStatsService != null){
265                 sendAggregateFlowsStatsFromAllTablesRequest(targetNode);
266                 sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
267             }
268             if(flowTableStatsService != null){
269                 sendAllFlowTablesStatisticsRequest(targetNodeRef);
270             }
271             if(portStatsService != null){
272                 sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
273             }
274             if(groupStatsService != null){
275                 sendAllGroupStatisticsRequest(targetNodeRef);
276                 sendGroupDescriptionRequest(targetNodeRef);
277             }
278             if(meterStatsService != null){
279                 sendAllMeterStatisticsRequest(targetNodeRef);
280                 sendMeterConfigStatisticsRequest(targetNodeRef);
281             }
282             if(queueStatsService != null){
283                 sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
284             }
285         }catch(Exception e){
286             spLogger.error("Exception occured while sending statistics requests : {}", e);
287         }
288     }
289
290
291     private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
292         final GetFlowTablesStatisticsInputBuilder input =
293                 new GetFlowTablesStatisticsInputBuilder();
294
295         input.setNode(targetNodeRef);
296
297         Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
298                 flowTableStatsService.getFlowTablesStatistics(input.build());
299
300         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
301                 , StatsRequestType.ALL_FLOW_TABLE);
302
303     }
304
305     private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
306         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
307                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
308
309         input.setNode(targetNode);
310
311         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
312                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
313
314         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
315                 , StatsRequestType.ALL_FLOW);
316
317     }
318
319     public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
320         final GetFlowStatisticsFromFlowTableInputBuilder input =
321                 new GetFlowStatisticsFromFlowTableInputBuilder();
322
323         input.setNode(targetNode);
324         input.fieldsFrom(flow);
325
326         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
327                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
328
329         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
330                 , StatsRequestType.ALL_FLOW);
331
332     }
333
334     private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
335
336         List<Short> tablesId = getTablesFromNode(targetNodeKey);
337
338         if(tablesId.size() != 0){
339             for(Short id : tablesId){
340
341                 sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
342             }
343         }else{
344             spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
345         }
346     }
347
348     private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
349
350         spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
351         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
352                 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
353
354         input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
355         input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
356         Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
357                 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
358
359         multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
360         this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
361                 , StatsRequestType.AGGR_FLOW);
362     }
363
364     private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
365
366         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
367
368         input.setNode(targetNode);
369
370         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
371                 portStatsService.getAllNodeConnectorsStatistics(input.build());
372         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
373                 , StatsRequestType.ALL_PORT);
374
375     }
376
377     private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
378
379         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
380
381         input.setNode(targetNode);
382
383         Future<RpcResult<GetAllGroupStatisticsOutput>> response =
384                 groupStatsService.getAllGroupStatistics(input.build());
385
386         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
387                 , StatsRequestType.ALL_GROUP);
388
389     }
390
391     public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
392         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
393
394         input.setNode(targetNode);
395
396         Future<RpcResult<GetGroupDescriptionOutput>> response =
397                 groupStatsService.getGroupDescription(input.build());
398
399         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
400                 , StatsRequestType.GROUP_DESC);
401
402     }
403
404     private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
405
406         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
407
408         input.setNode(targetNode);
409
410         Future<RpcResult<GetAllMeterStatisticsOutput>> response =
411                 meterStatsService.getAllMeterStatistics(input.build());
412
413         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
414                 , StatsRequestType.ALL_METER);;
415
416     }
417
418     public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
419
420         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
421
422         input.setNode(targetNode);
423
424         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
425                 meterStatsService.getAllMeterConfigStatistics(input.build());
426
427         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
428                 , StatsRequestType.METER_CONFIG);;
429
430     }
431
432     private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
433         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
434
435         input.setNode(targetNode);
436
437         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
438                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
439
440         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
441                 , StatsRequestType.ALL_QUEUE_STATS);;
442
443     }
444
445     public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
446         GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
447
448         input.setNode(targetNode);
449         input.setNodeConnectorId(nodeConnectorId);
450         input.setQueueId(queueId);
451         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
452                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
453
454         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
455                 , StatsRequestType.ALL_QUEUE_STATS);;
456
457     }
458
459     /**
460      * Get the handler for a particular node.
461      *
462      * @param nodeId source node
463      * @return Node statistics handler for that node. Null if the statistics should
464      *         not handled.
465      */
466     public final NodeStatisticsHandler getStatisticsHandler(final NodeId nodeId) {
467         Preconditions.checkNotNull(nodeId);
468         NodeStatisticsHandler handler = handlers.get(nodeId);
469         if (handler == null) {
470             spLogger.info("Attempted to get non-existing handler for {}", nodeId);
471         }
472         return handler;
473     }
474
475     private List<Node> getAllConnectedNodes(){
476         Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
477         if(nodes == null)
478             return null;
479
480         spLogger.debug("Number of connected nodes : {}",nodes.getNode().size());
481         return nodes.getNode();
482     }
483
484     private List<Short> getTablesFromNode(NodeKey nodeKey){
485         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
486
487         FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
488         List<Short> tablesId = new ArrayList<Short>();
489         if(node != null && node.getTable()!=null){
490             spLogger.debug("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
491             for(Table table: node.getTable()){
492                 tablesId.add(table.getId());
493             }
494         }
495         return tablesId;
496     }
497
498     @SuppressWarnings("unchecked")
499     private NodeId getNodeId(NodeRef nodeRef){
500         InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
501         NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
502         return nodeKey.getId();
503     }
504
505     @Override
506     public void close() {
507         try {
508             if (this.listenerRegistration != null) {
509                 this.listenerRegistration.close();
510                 this.statisticsRequesterThread.destroy();
511                 this.statisticsAgerThread.destroy();
512             }
513             if (this.flowCapableTrackerRegistration != null) {
514                 this.flowCapableTrackerRegistration.close();
515                 this.flowCapableTrackerRegistration = null;
516             }
517         } catch (Exception e) {
518             spLogger.warn("Failed to stop Statistics Provider completely", e);
519         } finally {
520             spLogger.info("Statistics Provider stopped.");
521         }
522     }
523
524     synchronized void startNodeHandlers(final Collection<NodeKey> addedNodes) {
525         for (NodeKey key : addedNodes) {
526             if (handlers.containsKey(key.getId())) {
527                 spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId());
528                 continue;
529             }
530
531             final NodeStatisticsHandler h = new NodeStatisticsHandler(this, key);
532             handlers.put(key.getId(), h);
533             spLogger.debug("Started node handler for {}", key.getId());
534
535             // FIXME: this should be in the NodeStatisticsHandler itself
536             sendStatisticsRequestsToNode(key);
537         }
538     }
539
540     synchronized void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
541         for (NodeKey key : removedNodes) {
542             final NodeStatisticsHandler s = handlers.remove(key.getId());
543             if (s != null) {
544                 spLogger.debug("Stopping node handler for {}", key.getId());
545                 s.close();
546             } else {
547                 spLogger.warn("Attempted to remove non-existing handler for {}, very strange", key.getId());
548             }
549         }
550     }
551 }