Merge "Do not rely od DataStore for all connected nodes"
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.List;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.ConcurrentMap;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.Future;
17
18 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
19 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
20 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
21 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
22 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
23 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
24 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
68 import org.opendaylight.yangtools.concepts.ListenerRegistration;
69 import org.opendaylight.yangtools.concepts.Registration;
70 import org.opendaylight.yangtools.yang.binding.DataObject;
71 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
72 import org.opendaylight.yangtools.yang.binding.NotificationListener;
73 import org.opendaylight.yangtools.yang.common.RpcResult;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 import com.google.common.base.Preconditions;
78
79 /**
80  * Following are main responsibilities of the class:
81  * 1) Invoke statistics request thread to send periodic statistics request to all the
82  * flow capable switch connected to the controller. It sends statistics request for
83  * Group,Meter,Table,Flow,Queue,Aggregate stats.
84  *
85  * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
86  * operational data store.
87  *
88  * @author avishnoi@in.ibm.com
89  *
90  */
91 public class StatisticsProvider implements AutoCloseable {
92     public static final int STATS_THREAD_EXECUTION_TIME= 15000;
93
94     private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
95
96     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
97     private final DataProviderService dps;
98
99     //Local caching of stats
100     private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
101
102     private OpendaylightGroupStatisticsService groupStatsService;
103
104     private OpendaylightMeterStatisticsService meterStatsService;
105
106     private OpendaylightFlowStatisticsService flowStatsService;
107
108     private OpendaylightPortStatisticsService portStatsService;
109
110     private OpendaylightFlowTableStatisticsService flowTableStatsService;
111
112     private OpendaylightQueueStatisticsService queueStatsService;
113
114     private StatisticsUpdateHandler statsUpdateHandler;
115
116     private Thread statisticsRequesterThread;
117
118     private Thread statisticsAgerThread;
119
120
121     public StatisticsProvider(final DataProviderService dataService) {
122         this.dps = Preconditions.checkNotNull(dataService);
123     }
124
125     public MultipartMessageManager getMultipartMessageManager() {
126         return multipartMessageManager;
127     }
128
129     private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
130
131     private Registration<NotificationListener> listenerRegistration;
132
133     private ListenerRegistration<DataChangeListener> flowCapableTrackerRegistration;
134
135     public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
136
137         // Get Group/Meter statistics service instances
138         groupStatsService = rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class);
139         meterStatsService = rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class);
140         flowStatsService = rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class);
141         portStatsService = rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class);
142         flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
143         queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
144
145         // Start receiving notifications
146         this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
147
148         // Register for switch connect/disconnect notifications
149         final InstanceIdentifier<FlowCapableNode> fcnId = InstanceIdentifier.builder(Nodes.class)
150                 .child(Node.class).augmentation(FlowCapableNode.class).build();
151         this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId,
152                 new FlowCapableTracker(this, fcnId));
153
154         statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
155         registerDataStoreUpdateListener(dbs);
156
157         statisticsRequesterThread = new Thread( new Runnable(){
158
159             @Override
160             public void run() {
161                 while(true){
162                     try {
163                         statsRequestSender();
164
165                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
166                     }catch (Exception e){
167                         spLogger.error("Exception occurred while sending stats request : {}",e);
168                     }
169                 }
170             }
171         });
172
173         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
174
175         statisticsRequesterThread.start();
176
177         statisticsAgerThread = new Thread( new Runnable(){
178
179             @Override
180             public void run() {
181                 while(true){
182                     try {
183                         for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){
184                             nodeStatisticsAger.cleanStaleStatistics();
185                         }
186                         multipartMessageManager.cleanStaleTransactionIds();
187
188                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
189                     }catch (Exception e){
190                         spLogger.error("Exception occurred while sending stats request : {}",e);
191                     }
192                 }
193             }
194         });
195
196         spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
197
198         statisticsAgerThread.start();
199
200         spLogger.info("Statistics Provider started.");
201     }
202
203     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
204         // FIXME: the below should be broken out into StatisticsUpdateHandler
205
206         //Register for flow updates
207         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
208                                                                     .augmentation(FlowCapableNode.class)
209                                                                     .child(Table.class)
210                                                                     .child(Flow.class).toInstance();
211         dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
212
213         //Register for meter updates
214         InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
215                                                     .augmentation(FlowCapableNode.class)
216                                                     .child(Meter.class).toInstance();
217
218         dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
219
220         //Register for group updates
221         InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
222                                                     .augmentation(FlowCapableNode.class)
223                                                     .child(Group.class).toInstance();
224         dbs.registerDataChangeListener(pathGroup, statsUpdateHandler);
225
226         //Register for queue updates
227         InstanceIdentifier<? extends DataObject> pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class)
228                                                                     .child(NodeConnector.class)
229                                                                     .augmentation(FlowCapableNodeConnector.class)
230                                                                     .child(Queue.class).toInstance();
231         dbs.registerDataChangeListener(pathQueue, statsUpdateHandler);
232     }
233
234     protected DataModificationTransaction startChange() {
235         return dps.beginTransaction();
236     }
237
238     private void statsRequestSender() {
239         for (NodeStatisticsHandler h : handlers.values()) {
240             sendStatisticsRequestsToNode(h.getTargetNodeKey());
241         }
242     }
243
244     private void sendStatisticsRequestsToNode(NodeKey targetNode){
245
246         spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
247
248         InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode).toInstance();
249
250         NodeRef targetNodeRef = new NodeRef(targetInstanceId);
251
252         try{
253             if(flowStatsService != null){
254                 sendAggregateFlowsStatsFromAllTablesRequest(targetNode);
255                 sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
256             }
257             if(flowTableStatsService != null){
258                 sendAllFlowTablesStatisticsRequest(targetNodeRef);
259             }
260             if(portStatsService != null){
261                 sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
262             }
263             if(groupStatsService != null){
264                 sendAllGroupStatisticsRequest(targetNodeRef);
265                 sendGroupDescriptionRequest(targetNodeRef);
266             }
267             if(meterStatsService != null){
268                 sendAllMeterStatisticsRequest(targetNodeRef);
269                 sendMeterConfigStatisticsRequest(targetNodeRef);
270             }
271             if(queueStatsService != null){
272                 sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
273             }
274         }catch(Exception e){
275             spLogger.error("Exception occured while sending statistics requests : {}", e);
276         }
277     }
278
279
280     private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
281         final GetFlowTablesStatisticsInputBuilder input =
282                 new GetFlowTablesStatisticsInputBuilder();
283
284         input.setNode(targetNodeRef);
285
286         Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
287                 flowTableStatsService.getFlowTablesStatistics(input.build());
288
289         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
290                 , StatsRequestType.ALL_FLOW_TABLE);
291
292     }
293
294     private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
295         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
296                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
297
298         input.setNode(targetNode);
299
300         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
301                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
302
303         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
304                 , StatsRequestType.ALL_FLOW);
305
306     }
307
308     public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
309         final GetFlowStatisticsFromFlowTableInputBuilder input =
310                 new GetFlowStatisticsFromFlowTableInputBuilder();
311
312         input.setNode(targetNode);
313         input.fieldsFrom(flow);
314
315         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
316                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
317
318         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
319                 , StatsRequestType.ALL_FLOW);
320
321     }
322
323     private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
324
325         List<Short> tablesId = getTablesFromNode(targetNodeKey);
326
327         if(tablesId.size() != 0){
328             for(Short id : tablesId){
329
330                 sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
331             }
332         }else{
333             spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
334         }
335     }
336
337     private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
338
339         spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
340         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
341                 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
342
343         input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
344         input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
345         Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
346                 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
347
348         multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
349         this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
350                 , StatsRequestType.AGGR_FLOW);
351     }
352
353     private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
354
355         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
356
357         input.setNode(targetNode);
358
359         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
360                 portStatsService.getAllNodeConnectorsStatistics(input.build());
361         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
362                 , StatsRequestType.ALL_PORT);
363
364     }
365
366     private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
367
368         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
369
370         input.setNode(targetNode);
371
372         Future<RpcResult<GetAllGroupStatisticsOutput>> response =
373                 groupStatsService.getAllGroupStatistics(input.build());
374
375         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
376                 , StatsRequestType.ALL_GROUP);
377
378     }
379
380     public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
381         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
382
383         input.setNode(targetNode);
384
385         Future<RpcResult<GetGroupDescriptionOutput>> response =
386                 groupStatsService.getGroupDescription(input.build());
387
388         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
389                 , StatsRequestType.GROUP_DESC);
390
391     }
392
393     private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
394
395         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
396
397         input.setNode(targetNode);
398
399         Future<RpcResult<GetAllMeterStatisticsOutput>> response =
400                 meterStatsService.getAllMeterStatistics(input.build());
401
402         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
403                 , StatsRequestType.ALL_METER);;
404
405     }
406
407     public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
408
409         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
410
411         input.setNode(targetNode);
412
413         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
414                 meterStatsService.getAllMeterConfigStatistics(input.build());
415
416         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
417                 , StatsRequestType.METER_CONFIG);;
418
419     }
420
421     private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
422         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
423
424         input.setNode(targetNode);
425
426         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
427                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
428
429         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
430                 , StatsRequestType.ALL_QUEUE_STATS);;
431
432     }
433
434     public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
435         GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
436
437         input.setNode(targetNode);
438         input.setNodeConnectorId(nodeConnectorId);
439         input.setQueueId(queueId);
440         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
441                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
442
443         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
444                 , StatsRequestType.ALL_QUEUE_STATS);;
445
446     }
447
448     /**
449      * Get the handler for a particular node.
450      *
451      * @param nodeId source node
452      * @return Node statistics handler for that node. Null if the statistics should
453      *         not handled.
454      */
455     public final NodeStatisticsHandler getStatisticsHandler(final NodeId nodeId) {
456         Preconditions.checkNotNull(nodeId);
457         NodeStatisticsHandler handler = handlers.get(nodeId);
458         if (handler == null) {
459             spLogger.info("Attempted to get non-existing handler for {}", nodeId);
460         }
461         return handler;
462     }
463
464     private List<Short> getTablesFromNode(NodeKey nodeKey){
465         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
466
467         FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
468         List<Short> tablesId = new ArrayList<Short>();
469         if(node != null && node.getTable()!=null){
470             spLogger.debug("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
471             for(Table table: node.getTable()){
472                 tablesId.add(table.getId());
473             }
474         }
475         return tablesId;
476     }
477
478     @SuppressWarnings("unchecked")
479     private NodeId getNodeId(NodeRef nodeRef){
480         InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
481         NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
482         return nodeKey.getId();
483     }
484
485     @Override
486     public void close() {
487         try {
488             if (this.listenerRegistration != null) {
489                 this.listenerRegistration.close();
490                 this.statisticsRequesterThread.destroy();
491                 this.statisticsAgerThread.destroy();
492             }
493             if (this.flowCapableTrackerRegistration != null) {
494                 this.flowCapableTrackerRegistration.close();
495                 this.flowCapableTrackerRegistration = null;
496             }
497         } catch (Exception e) {
498             spLogger.warn("Failed to stop Statistics Provider completely", e);
499         } finally {
500             spLogger.info("Statistics Provider stopped.");
501         }
502     }
503
504     synchronized void startNodeHandlers(final Collection<NodeKey> addedNodes) {
505         for (NodeKey key : addedNodes) {
506             if (handlers.containsKey(key.getId())) {
507                 spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId());
508                 continue;
509             }
510
511             final NodeStatisticsHandler h = new NodeStatisticsHandler(this, key);
512             handlers.put(key.getId(), h);
513             spLogger.debug("Started node handler for {}", key.getId());
514
515             // FIXME: this should be in the NodeStatisticsHandler itself
516             sendStatisticsRequestsToNode(key);
517         }
518     }
519
520     synchronized void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
521         for (NodeKey key : removedNodes) {
522             final NodeStatisticsHandler s = handlers.remove(key.getId());
523             if (s != null) {
524                 spLogger.debug("Stopping node handler for {}", key.getId());
525                 s.close();
526             } else {
527                 spLogger.warn("Attempted to remove non-existing handler for {}, very strange", key.getId());
528             }
529         }
530     }
531 }