Centralize targetNodeRef in the NodeStatisticsHandler
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.Collection;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.ConcurrentMap;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.Future;
15
16 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
17 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
18 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
19 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
20 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
21 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
22 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
67 import org.opendaylight.yangtools.concepts.ListenerRegistration;
68 import org.opendaylight.yangtools.concepts.Registration;
69 import org.opendaylight.yangtools.yang.binding.DataObject;
70 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
71 import org.opendaylight.yangtools.yang.binding.NotificationListener;
72 import org.opendaylight.yangtools.yang.common.RpcResult;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75
76 import com.google.common.base.Preconditions;
77
78 /**
79  * Following are main responsibilities of the class:
80  * 1) Invoke statistics request thread to send periodic statistics request to all the
81  * flow capable switch connected to the controller. It sends statistics request for
82  * Group,Meter,Table,Flow,Queue,Aggregate stats.
83  *
84  * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
85  * operational data store.
86  *
87  * @author avishnoi@in.ibm.com
88  *
89  */
90 public class StatisticsProvider implements AutoCloseable {
91     public static final int STATS_THREAD_EXECUTION_TIME= 15000;
92
93     private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
94
95     private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
96     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
97     private final DataProviderService dps;
98
99     private OpendaylightGroupStatisticsService groupStatsService;
100
101     private OpendaylightMeterStatisticsService meterStatsService;
102
103     private OpendaylightFlowStatisticsService flowStatsService;
104
105     private OpendaylightPortStatisticsService portStatsService;
106
107     private OpendaylightFlowTableStatisticsService flowTableStatsService;
108
109     private OpendaylightQueueStatisticsService queueStatsService;
110
111     private StatisticsUpdateHandler statsUpdateHandler;
112
113     private Thread statisticsRequesterThread;
114
115     private Thread statisticsAgerThread;
116
117
118     public StatisticsProvider(final DataProviderService dataService) {
119         this.dps = Preconditions.checkNotNull(dataService);
120     }
121
122     public MultipartMessageManager getMultipartMessageManager() {
123         return multipartMessageManager;
124     }
125
126     private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
127
128     private Registration<NotificationListener> listenerRegistration;
129
130     private ListenerRegistration<DataChangeListener> flowCapableTrackerRegistration;
131
132     public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
133
134         // Get Group/Meter statistics service instances
135         groupStatsService = rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class);
136         meterStatsService = rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class);
137         flowStatsService = rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class);
138         portStatsService = rpcRegistry.getRpcService(OpendaylightPortStatisticsService.class);
139         flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
140         queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
141
142         // Start receiving notifications
143         this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
144
145         // Register for switch connect/disconnect notifications
146         final InstanceIdentifier<FlowCapableNode> fcnId = InstanceIdentifier.builder(Nodes.class)
147                 .child(Node.class).augmentation(FlowCapableNode.class).build();
148         spLogger.debug("Registering FlowCapable tracker to {}", fcnId);
149         this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId,
150                 new FlowCapableTracker(this, fcnId));
151
152         statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
153         registerDataStoreUpdateListener(dbs);
154
155         statisticsRequesterThread = new Thread( new Runnable(){
156
157             @Override
158             public void run() {
159                 while(true){
160                     try {
161                         statsRequestSender();
162
163                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
164                     }catch (Exception e){
165                         spLogger.error("Exception occurred while sending stats request : {}",e);
166                     }
167                 }
168             }
169         });
170
171         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
172
173         statisticsRequesterThread.start();
174
175         statisticsAgerThread = new Thread( new Runnable(){
176
177             @Override
178             public void run() {
179                 while(true){
180                     try {
181                         for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){
182                             nodeStatisticsAger.cleanStaleStatistics();
183                         }
184                         multipartMessageManager.cleanStaleTransactionIds();
185
186                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
187                     }catch (Exception e){
188                         spLogger.error("Exception occurred while sending stats request : {}",e);
189                     }
190                 }
191             }
192         });
193
194         spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
195
196         statisticsAgerThread.start();
197
198         spLogger.info("Statistics Provider started.");
199     }
200
201     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
202         // FIXME: the below should be broken out into StatisticsUpdateHandler
203
204         //Register for flow updates
205         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
206                                                                     .augmentation(FlowCapableNode.class)
207                                                                     .child(Table.class)
208                                                                     .child(Flow.class).toInstance();
209         dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
210
211         //Register for meter updates
212         InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
213                                                     .augmentation(FlowCapableNode.class)
214                                                     .child(Meter.class).toInstance();
215
216         dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
217
218         //Register for group updates
219         InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
220                                                     .augmentation(FlowCapableNode.class)
221                                                     .child(Group.class).toInstance();
222         dbs.registerDataChangeListener(pathGroup, statsUpdateHandler);
223
224         //Register for queue updates
225         InstanceIdentifier<? extends DataObject> pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class)
226                                                                     .child(NodeConnector.class)
227                                                                     .augmentation(FlowCapableNodeConnector.class)
228                                                                     .child(Queue.class).toInstance();
229         dbs.registerDataChangeListener(pathQueue, statsUpdateHandler);
230     }
231
232     protected DataModificationTransaction startChange() {
233         return dps.beginTransaction();
234     }
235
236     private void statsRequestSender() {
237         for (NodeStatisticsHandler h : handlers.values()) {
238             sendStatisticsRequestsToNode(h);
239         }
240     }
241
242     private void sendStatisticsRequestsToNode(final NodeStatisticsHandler h) {
243         NodeKey targetNode = h.getTargetNodeKey();
244         spLogger.debug("Send requests for statistics collection to node : {}", targetNode.getId());
245
246         try{
247             if(flowTableStatsService != null){
248                 sendAllFlowTablesStatisticsRequest(h);
249             }
250             if(flowStatsService != null){
251                 // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
252                 //        comes back -- we do not have any tables anyway.
253                 sendAggregateFlowsStatsFromAllTablesRequest(h);
254
255                 sendAllFlowsStatsFromAllTablesRequest(h);
256             }
257             if(portStatsService != null){
258                 sendAllNodeConnectorsStatisticsRequest(h);
259             }
260             if(groupStatsService != null){
261                 sendAllGroupStatisticsRequest(h);
262                 sendGroupDescriptionRequest(h.getTargetNodeRef());
263             }
264             if(meterStatsService != null){
265                 sendAllMeterStatisticsRequest(h);
266                 sendMeterConfigStatisticsRequest(h.getTargetNodeRef());
267             }
268             if(queueStatsService != null){
269                 sendAllQueueStatsFromAllNodeConnector(h);
270             }
271         }catch(Exception e){
272             spLogger.error("Exception occured while sending statistics requests : {}", e);
273         }
274     }
275
276
277     private void sendAllFlowTablesStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException {
278         final GetFlowTablesStatisticsInputBuilder input =
279                 new GetFlowTablesStatisticsInputBuilder();
280
281         input.setNode(h.getTargetNodeRef());
282
283         Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
284                 flowTableStatsService.getFlowTablesStatistics(input.build());
285
286         this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),response.get().getResult().getTransactionId()
287                 , StatsRequestType.ALL_FLOW_TABLE);
288
289     }
290
291     private void sendAllFlowsStatsFromAllTablesRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
292         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
293                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
294
295         input.setNode(h.getTargetNodeRef());
296
297         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
298                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
299
300         this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
301                 , StatsRequestType.ALL_FLOW);
302
303     }
304
305     public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
306         final GetFlowStatisticsFromFlowTableInputBuilder input =
307                 new GetFlowStatisticsFromFlowTableInputBuilder();
308
309         input.setNode(targetNode);
310         input.fieldsFrom(flow);
311
312         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
313                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
314
315         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
316                 , StatsRequestType.ALL_FLOW);
317
318     }
319
320     private void sendAggregateFlowsStatsFromAllTablesRequest(final NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
321         final Collection<TableKey> tables = h.getKnownTables();
322         spLogger.debug("Node {} supports {} table(s)", h, tables.size());
323
324         for (TableKey key : h.getKnownTables()) {
325             sendAggregateFlowsStatsFromTableRequest(h.getTargetNodeKey(), key.getId().shortValue());
326         }
327     }
328
329     private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
330
331         spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
332         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
333                 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
334
335         input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
336         input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
337         Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
338                 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
339
340         multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
341         this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
342                 , StatsRequestType.AGGR_FLOW);
343     }
344
345     private void sendAllNodeConnectorsStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
346
347         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
348
349         input.setNode(h.getTargetNodeRef());
350
351         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
352                 portStatsService.getAllNodeConnectorsStatistics(input.build());
353         this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
354                 , StatsRequestType.ALL_PORT);
355
356     }
357
358     private void sendAllGroupStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
359
360         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
361
362         input.setNode(h.getTargetNodeRef());
363
364         Future<RpcResult<GetAllGroupStatisticsOutput>> response =
365                 groupStatsService.getAllGroupStatistics(input.build());
366
367         this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
368                 , StatsRequestType.ALL_GROUP);
369
370     }
371
372     public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
373         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
374
375         input.setNode(targetNode);
376
377         Future<RpcResult<GetGroupDescriptionOutput>> response =
378                 groupStatsService.getGroupDescription(input.build());
379
380         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
381                 , StatsRequestType.GROUP_DESC);
382
383     }
384
385     private void sendAllMeterStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
386
387         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
388
389         input.setNode(h.getTargetNodeRef());
390
391         Future<RpcResult<GetAllMeterStatisticsOutput>> response =
392                 meterStatsService.getAllMeterStatistics(input.build());
393
394         this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
395                 , StatsRequestType.ALL_METER);;
396
397     }
398
399     public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
400
401         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
402
403         input.setNode(targetNode);
404
405         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
406                 meterStatsService.getAllMeterConfigStatistics(input.build());
407
408         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
409                 , StatsRequestType.METER_CONFIG);;
410
411     }
412
413     private void sendAllQueueStatsFromAllNodeConnector(NodeStatisticsHandler h) throws InterruptedException, ExecutionException {
414         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
415
416         input.setNode(h.getTargetNodeRef());
417
418         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
419                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
420
421         this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
422                 , StatsRequestType.ALL_QUEUE_STATS);;
423
424     }
425
426     public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
427         GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
428
429         input.setNode(targetNode);
430         input.setNodeConnectorId(nodeConnectorId);
431         input.setQueueId(queueId);
432         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
433                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
434
435         this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
436                 , StatsRequestType.ALL_QUEUE_STATS);;
437
438     }
439
440     /**
441      * Get the handler for a particular node.
442      *
443      * @param nodeId source node
444      * @return Node statistics handler for that node. Null if the statistics should
445      *         not handled.
446      */
447     public final NodeStatisticsHandler getStatisticsHandler(final NodeId nodeId) {
448         Preconditions.checkNotNull(nodeId);
449         NodeStatisticsHandler handler = handlers.get(nodeId);
450         if (handler == null) {
451             spLogger.info("Attempted to get non-existing handler for {}", nodeId);
452         }
453         return handler;
454     }
455
456     @SuppressWarnings("unchecked")
457     private NodeId getNodeId(NodeRef nodeRef){
458         InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
459         NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
460         return nodeKey.getId();
461     }
462
463     @Override
464     public void close() {
465         try {
466             if (this.listenerRegistration != null) {
467                 this.listenerRegistration.close();
468                 this.statisticsRequesterThread.destroy();
469                 this.statisticsAgerThread.destroy();
470             }
471             if (this.flowCapableTrackerRegistration != null) {
472                 this.flowCapableTrackerRegistration.close();
473                 this.flowCapableTrackerRegistration = null;
474             }
475         } catch (Exception e) {
476             spLogger.warn("Failed to stop Statistics Provider completely", e);
477         } finally {
478             spLogger.info("Statistics Provider stopped.");
479         }
480     }
481
482     void startNodeHandlers(final Collection<NodeKey> addedNodes) {
483         for (NodeKey key : addedNodes) {
484             if (handlers.containsKey(key.getId())) {
485                 spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId());
486                 continue;
487             }
488
489             final NodeStatisticsHandler h = new NodeStatisticsHandler(dps, key);
490             final NodeStatisticsHandler old = handlers.putIfAbsent(key.getId(), h);
491             if (old == null) {
492                 spLogger.debug("Started node handler for {}", key.getId());
493
494                 // FIXME: this should be in the NodeStatisticsHandler itself
495                 sendStatisticsRequestsToNode(h);
496             } else {
497                 spLogger.debug("Prevented race on handler for {}", key.getId());
498             }
499         }
500     }
501
502     void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
503         for (NodeKey key : removedNodes) {
504             final NodeStatisticsHandler s = handlers.remove(key.getId());
505             if (s != null) {
506                 spLogger.debug("Stopping node handler for {}", key.getId());
507                 s.close();
508             } else {
509                 spLogger.warn("Attempted to remove non-existing handler for {}, very strange", key.getId());
510             }
511         }
512     }
513 }