Merge "Bug fixes for netconf southbound plugin."
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / NodeStatisticsHandler.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.Collection;
11 import java.util.List;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15
16 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
17 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
18 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterFeatures;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
81 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
82 import org.opendaylight.yangtools.yang.common.RpcResult;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85
86 import com.google.common.base.Preconditions;
87
88 /**
89  * This class handles the lifecycle of per-node statistics. It receives data
90  * from StatisticsListener, stores it in the data store and keeps track of
91  * when the data should be removed.
92  *
93  * @author avishnoi@in.ibm.com
94  */
95 public final class NodeStatisticsHandler implements AutoCloseable {
96     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
97     private static final int NUMBER_OF_WAIT_CYCLES = 2;
98
99     private final OpendaylightFlowStatisticsService flowStatsService;
100     private final OpendaylightFlowTableStatisticsService flowTableStatsService;
101     private final OpendaylightGroupStatisticsService groupStatsService;
102     private final OpendaylightMeterStatisticsService meterStatsService;
103     private final OpendaylightPortStatisticsService portStatsService;
104     private final OpendaylightQueueStatisticsService queueStatsService;
105
106     private final MultipartMessageManager msgManager = new MultipartMessageManager();
107     private final InstanceIdentifier<Node> targetNodeIdentifier;
108     private final FlowStatsTracker flowStats;
109     private final FlowTableStatsTracker flowTableStats;
110     private final GroupDescStatsTracker groupDescStats;
111     private final GroupStatsTracker groupStats;
112     private final MeterConfigStatsTracker meterConfigStats;
113     private final MeterStatsTracker meterStats;
114     private final NodeConnectorStatsTracker nodeConnectorStats;
115     private final QueueStatsTracker queueStats;
116     private final DataProviderService dps;
117     private final NodeRef targetNodeRef;
118     private final NodeKey targetNodeKey;
119
120     public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey,
121             final OpendaylightFlowStatisticsService flowStatsService,
122             final OpendaylightFlowTableStatisticsService flowTableStatsService,
123             final OpendaylightGroupStatisticsService groupStatsService,
124             final OpendaylightMeterStatisticsService meterStatsService,
125             final OpendaylightPortStatisticsService portStatsService,
126             final OpendaylightQueueStatisticsService queueStatsService) {
127         this.dps = Preconditions.checkNotNull(dps);
128         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
129         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
130         this.targetNodeRef = new NodeRef(targetNodeIdentifier);
131
132         this.flowStatsService = flowStatsService;
133         this.flowTableStatsService = flowTableStatsService;
134         this.groupStatsService = groupStatsService;
135         this.meterStatsService = meterStatsService;
136         this.portStatsService = portStatsService;
137         this.queueStatsService = queueStatsService;
138
139         final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
140         flowStats = new FlowStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
141         flowTableStats = new FlowTableStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
142         groupDescStats = new GroupDescStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
143         groupStats = new GroupStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
144         meterConfigStats = new MeterConfigStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
145         meterStats = new MeterStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
146         nodeConnectorStats = new NodeConnectorStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
147         queueStats = new QueueStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
148     }
149
150     public NodeKey getTargetNodeKey() {
151         return targetNodeKey;
152     }
153
154     public Collection<TableKey> getKnownTables() {
155         return flowTableStats.getTables();
156     }
157
158     public InstanceIdentifier<Node> getTargetNodeIdentifier() {
159         return targetNodeIdentifier;
160     }
161
162     public NodeRef getTargetNodeRef() {
163         return targetNodeRef;
164     }
165
166     public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
167         if (msgManager.isExpectedTransaction(transaction, more)) {
168             groupDescStats.updateStats(list);
169         }
170     }
171
172     public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
173         if (msgManager.isExpectedTransaction(transaction, more)) {
174             groupStats.updateStats(list);
175         }
176     }
177
178     public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
179         if (msgManager.isExpectedTransaction(transaction, more)) {
180             meterConfigStats.updateStats(list);
181         }
182     }
183
184     public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
185         if (msgManager.isExpectedTransaction(transaction, more)) {
186             meterStats.updateStats(list);
187         }
188     }
189
190     public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
191         if (msgManager.isExpectedTransaction(transaction, more)) {
192             queueStats.updateStats(list);
193         }
194     }
195
196     public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
197         if (msgManager.isExpectedTransaction(transaction, more)) {
198             flowTableStats.updateStats(list);
199         }
200     }
201
202     public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
203         if (msgManager.isExpectedTransaction(transaction, more)) {
204             nodeConnectorStats.updateStats(list);
205         }
206     }
207
208     public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
209         final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
210         if (tableId != null) {
211             final DataModificationTransaction trans = dps.beginTransaction();
212             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
213                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
214
215             AggregateFlowStatisticsDataBuilder aggregateFlowStatisticsDataBuilder = new AggregateFlowStatisticsDataBuilder();
216             AggregateFlowStatisticsBuilder aggregateFlowStatisticsBuilder = new AggregateFlowStatisticsBuilder(flowStats);
217
218             aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
219
220             logger.debug("Augment aggregate statistics: {} for table {} on Node {}",
221                     aggregateFlowStatisticsBuilder.build().toString(),tableId,targetNodeKey);
222
223             TableBuilder tableBuilder = new TableBuilder();
224             tableBuilder.setKey(new TableKey(tableId));
225             tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
226             trans.putOperationalData(tableRef, tableBuilder.build());
227
228             trans.commit();
229         }
230     }
231
232     public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
233         if (msgManager.isExpectedTransaction(transaction, more)) {
234             flowStats.updateStats(list);
235         }
236     }
237
238     public synchronized void updateGroupFeatures(GroupFeatures notification) {
239         final DataModificationTransaction trans = dps.beginTransaction();
240
241         final NodeBuilder nodeData = new NodeBuilder();
242         nodeData.setKey(targetNodeKey);
243
244         NodeGroupFeaturesBuilder nodeGroupFeatures = new NodeGroupFeaturesBuilder();
245         GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder(notification);
246         nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
247
248         //Update augmented data
249         nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
250         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
251
252         // FIXME: should we be tracking this data?
253         trans.commit();
254     }
255
256     public synchronized void updateMeterFeatures(MeterFeatures features) {
257         final DataModificationTransaction trans = dps.beginTransaction();
258
259         final NodeBuilder nodeData = new NodeBuilder();
260         nodeData.setKey(targetNodeKey);
261
262         NodeMeterFeaturesBuilder nodeMeterFeatures = new NodeMeterFeaturesBuilder();
263         MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder(features);
264         nodeMeterFeatures.setMeterFeatures(meterFeature.build());
265
266         //Update augmented data
267         nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
268         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
269
270         // FIXME: should we be tracking this data?
271         trans.commit();
272     }
273
274     public synchronized void cleanStaleStatistics() {
275         final DataModificationTransaction trans = dps.beginTransaction();
276         final long now = System.nanoTime();
277
278         flowStats.cleanup(trans, now);
279         groupDescStats.cleanup(trans, now);
280         groupStats.cleanup(trans, now);
281         meterConfigStats.cleanup(trans, now);
282         meterStats.cleanup(trans, now);
283         nodeConnectorStats.cleanup(trans, now);
284         queueStats.cleanup(trans, now);
285         msgManager.cleanStaleTransactionIds();
286
287         trans.commit();
288     }
289
290     public synchronized void requestPeriodicStatistics() {
291         logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
292
293         try{
294             if(flowTableStatsService != null){
295                 final GetFlowTablesStatisticsInputBuilder input = new GetFlowTablesStatisticsInputBuilder();
296                 input.setNode(targetNodeRef);
297
298                 Future<RpcResult<GetFlowTablesStatisticsOutput>> response = flowTableStatsService.getFlowTablesStatistics(input.build());
299                 recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW_TABLE);
300             }
301             if(flowStatsService != null){
302                 // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
303                 //        comes back -- we do not have any tables anyway.
304                 sendAggregateFlowsStatsFromAllTablesRequest();
305
306                 sendAllFlowsStatsFromAllTablesRequest();
307             }
308             if(portStatsService != null){
309                 sendAllNodeConnectorsStatisticsRequest();
310             }
311             if(groupStatsService != null){
312                 sendAllGroupStatisticsRequest();
313                 sendGroupDescriptionRequest();
314             }
315             if(meterStatsService != null){
316                 sendAllMeterStatisticsRequest();
317                 sendMeterConfigStatisticsRequest();
318             }
319             if(queueStatsService != null){
320                 sendAllQueueStatsFromAllNodeConnector();
321             }
322         } catch(Exception e) {
323             logger.error("Exception occured while sending statistics requests", e);
324         }
325     }
326
327     public synchronized void start() {
328         requestPeriodicStatistics();
329     }
330
331     @Override
332     public synchronized void close() {
333         // FIXME: cleanup any resources we hold (registrations, etc.)
334         logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
335     }
336
337     synchronized void sendFlowStatsFromTableRequest(Flow flow) throws InterruptedException, ExecutionException{
338         final GetFlowStatisticsFromFlowTableInputBuilder input =
339                 new GetFlowStatisticsFromFlowTableInputBuilder(flow);
340
341         input.setNode(targetNodeRef);
342
343         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
344                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
345
346         recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
347     }
348
349     synchronized void sendGroupDescriptionRequest() throws InterruptedException, ExecutionException{
350         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
351
352         input.setNode(targetNodeRef);
353
354         Future<RpcResult<GetGroupDescriptionOutput>> response =
355                 groupStatsService.getGroupDescription(input.build());
356
357         recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC);
358     }
359
360     synchronized void sendMeterConfigStatisticsRequest() throws InterruptedException, ExecutionException{
361
362         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
363
364         input.setNode(targetNodeRef);
365
366         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
367                 meterStatsService.getAllMeterConfigStatistics(input.build());
368
369         recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);
370     }
371
372     synchronized void sendQueueStatsFromGivenNodeConnector(NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
373         GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
374
375         input.setNode(targetNodeRef);
376         input.setNodeConnectorId(nodeConnectorId);
377         input.setQueueId(queueId);
378         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
379                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
380
381         recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
382     }
383
384     private void sendAllMeterStatisticsRequest() throws InterruptedException, ExecutionException{
385
386         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
387
388         input.setNode(targetNodeRef);
389
390         Future<RpcResult<GetAllMeterStatisticsOutput>> response =
391                 meterStatsService.getAllMeterStatistics(input.build());
392
393         recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_METER);
394     }
395
396     private void sendAllFlowsStatsFromAllTablesRequest() throws InterruptedException, ExecutionException{
397         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
398         input.setNode(targetNodeRef);
399
400         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
401
402         recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
403     }
404
405     private void sendAggregateFlowsStatsFromAllTablesRequest() throws InterruptedException, ExecutionException{
406         final Collection<TableKey> tables = getKnownTables();
407         logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
408
409         for (TableKey key : tables) {
410             sendAggregateFlowsStatsFromTableRequest(key.getId().shortValue());
411         }
412     }
413
414     private void sendAggregateFlowsStatsFromTableRequest(Short tableId) throws InterruptedException, ExecutionException{
415         logger.debug("Send aggregate stats request for flow table {} to node {}",tableId, targetNodeKey);
416         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
417                 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
418
419         input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
420         input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
421         Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
422                 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
423
424         recordExpectedTableTransaction(response.get().getResult().getTransactionId(), tableId);
425     }
426
427     private void sendAllQueueStatsFromAllNodeConnector() throws InterruptedException, ExecutionException {
428         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
429
430         input.setNode(targetNodeRef);
431
432         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
433                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
434
435         recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);
436     }
437
438     private void sendAllNodeConnectorsStatisticsRequest() throws InterruptedException, ExecutionException{
439         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
440
441         input.setNode(targetNodeRef);
442
443         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
444                 portStatsService.getAllNodeConnectorsStatistics(input.build());
445         recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_PORT);
446     }
447
448     private void sendAllGroupStatisticsRequest() throws InterruptedException, ExecutionException{
449         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
450         input.setNode(targetNodeRef);
451
452         Future<RpcResult<GetAllGroupStatisticsOutput>> response =
453                 groupStatsService.getAllGroupStatistics(input.build());
454
455         recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_GROUP);
456     }
457
458     private void recordExpectedTransaction(TransactionId transactionId, StatsRequestType reqType) {
459         msgManager.recordExpectedTransaction(transactionId, reqType);
460     }
461
462     private void recordExpectedTableTransaction(TransactionId transactionId, Short tableId) {
463         msgManager.recordExpectedTableTransaction(transactionId, StatsRequestType.AGGR_FLOW, tableId);
464     }
465 }