Move statistics request functions into trackers
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / NodeStatisticsHandler.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.Collection;
11 import java.util.List;
12 import java.util.concurrent.TimeUnit;
13
14 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
15 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
16 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterFeatures;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 import com.google.common.base.Preconditions;
62 import com.google.common.util.concurrent.FutureCallback;
63 import com.google.common.util.concurrent.Futures;
64 import com.google.common.util.concurrent.ListenableFuture;
65
66 /**
67  * This class handles the lifecycle of per-node statistics. It receives data
68  * from StatisticsListener, stores it in the data store and keeps track of
69  * when the data should be removed.
70  *
71  * @author avishnoi@in.ibm.com
72  */
73 public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext {
74     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
75     private static final int NUMBER_OF_WAIT_CYCLES = 2;
76
77     private final MultipartMessageManager msgManager = new MultipartMessageManager();
78     private final InstanceIdentifier<Node> targetNodeIdentifier;
79     private final FlowStatsTracker flowStats;
80     private final FlowTableStatsTracker flowTableStats;
81     private final GroupDescStatsTracker groupDescStats;
82     private final GroupStatsTracker groupStats;
83     private final MeterConfigStatsTracker meterConfigStats;
84     private final MeterStatsTracker meterStats;
85     private final NodeConnectorStatsTracker nodeConnectorStats;
86     private final QueueStatsTracker queueStats;
87     private final DataProviderService dps;
88     private final NodeRef targetNodeRef;
89     private final NodeKey targetNodeKey;
90
91     public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey,
92             final OpendaylightFlowStatisticsService flowStatsService,
93             final OpendaylightFlowTableStatisticsService flowTableStatsService,
94             final OpendaylightGroupStatisticsService groupStatsService,
95             final OpendaylightMeterStatisticsService meterStatsService,
96             final OpendaylightPortStatisticsService portStatsService,
97             final OpendaylightQueueStatisticsService queueStatsService) {
98         this.dps = Preconditions.checkNotNull(dps);
99         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
100         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
101         this.targetNodeRef = new NodeRef(targetNodeIdentifier);
102
103         final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
104
105         if (flowStatsService != null) {
106             flowStats = new FlowStatsTracker(flowStatsService, this, lifetimeNanos);
107         } else {
108             flowStats = null;
109         }
110         if (flowTableStatsService != null) {
111             flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this, lifetimeNanos);
112         } else {
113             flowTableStats = null;
114         }
115
116         if (groupStatsService != null) {
117             groupDescStats = new GroupDescStatsTracker(groupStatsService, this, lifetimeNanos);
118             groupStats = new GroupStatsTracker(groupStatsService, this, lifetimeNanos);
119         } else {
120             groupDescStats = null;
121             groupStats = null;
122         }
123         if (meterStatsService != null) {
124             meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this, lifetimeNanos);
125             meterStats = new MeterStatsTracker(meterStatsService, this, lifetimeNanos);
126         } else {
127             meterConfigStats = null;
128             meterStats = null;
129         }
130         if (portStatsService != null) {
131             nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this, lifetimeNanos);
132         } else {
133             nodeConnectorStats = null;
134         }
135         if (queueStatsService != null) {
136             queueStats = new QueueStatsTracker(queueStatsService, this, lifetimeNanos);
137         } else {
138             queueStats = null;
139         }
140     }
141
142     public NodeKey getTargetNodeKey() {
143         return targetNodeKey;
144     }
145
146     @Override
147     public InstanceIdentifier<Node> getNodeIdentifier() {
148         return targetNodeIdentifier;
149     }
150
151     @Override
152     public NodeRef getNodeRef() {
153         return targetNodeRef;
154     }
155
156     @Override
157     public DataModificationTransaction startDataModification() {
158         return dps.beginTransaction();
159     }
160
161     public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
162         if (msgManager.isExpectedTransaction(transaction, more)) {
163             groupDescStats.updateStats(list);
164         }
165     }
166
167     public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
168         if (msgManager.isExpectedTransaction(transaction, more)) {
169             groupStats.updateStats(list);
170         }
171     }
172
173     public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
174         if (msgManager.isExpectedTransaction(transaction, more)) {
175             meterConfigStats.updateStats(list);
176         }
177     }
178
179     public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
180         if (msgManager.isExpectedTransaction(transaction, more)) {
181             meterStats.updateStats(list);
182         }
183     }
184
185     public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
186         if (msgManager.isExpectedTransaction(transaction, more)) {
187             queueStats.updateStats(list);
188         }
189     }
190
191     public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
192         if (msgManager.isExpectedTransaction(transaction, more)) {
193             flowTableStats.updateStats(list);
194         }
195     }
196
197     public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
198         if (msgManager.isExpectedTransaction(transaction, more)) {
199             nodeConnectorStats.updateStats(list);
200         }
201     }
202
203     public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
204         final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
205         if (tableId != null) {
206             final DataModificationTransaction trans = dps.beginTransaction();
207             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
208                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
209
210             AggregateFlowStatisticsDataBuilder aggregateFlowStatisticsDataBuilder = new AggregateFlowStatisticsDataBuilder();
211             AggregateFlowStatisticsBuilder aggregateFlowStatisticsBuilder = new AggregateFlowStatisticsBuilder(flowStats);
212
213             aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
214
215             logger.debug("Augment aggregate statistics: {} for table {} on Node {}",
216                     aggregateFlowStatisticsBuilder.build().toString(),tableId,targetNodeKey);
217
218             TableBuilder tableBuilder = new TableBuilder();
219             tableBuilder.setKey(new TableKey(tableId));
220             tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
221             trans.putOperationalData(tableRef, tableBuilder.build());
222
223             trans.commit();
224         }
225     }
226
227     public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
228         if (msgManager.isExpectedTransaction(transaction, more)) {
229             flowStats.updateStats(list);
230         }
231     }
232
233     public synchronized void updateGroupFeatures(GroupFeatures notification) {
234         final DataModificationTransaction trans = dps.beginTransaction();
235
236         final NodeBuilder nodeData = new NodeBuilder();
237         nodeData.setKey(targetNodeKey);
238
239         NodeGroupFeaturesBuilder nodeGroupFeatures = new NodeGroupFeaturesBuilder();
240         GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder(notification);
241         nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
242
243         //Update augmented data
244         nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
245         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
246
247         // FIXME: should we be tracking this data?
248         trans.commit();
249     }
250
251     public synchronized void updateMeterFeatures(MeterFeatures features) {
252         final DataModificationTransaction trans = dps.beginTransaction();
253
254         final NodeBuilder nodeData = new NodeBuilder();
255         nodeData.setKey(targetNodeKey);
256
257         NodeMeterFeaturesBuilder nodeMeterFeatures = new NodeMeterFeaturesBuilder();
258         MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder(features);
259         nodeMeterFeatures.setMeterFeatures(meterFeature.build());
260
261         //Update augmented data
262         nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
263         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
264
265         // FIXME: should we be tracking this data?
266         trans.commit();
267     }
268
269     public synchronized void cleanStaleStatistics() {
270         final DataModificationTransaction trans = dps.beginTransaction();
271         final long now = System.nanoTime();
272
273         flowStats.cleanup(trans, now);
274         groupDescStats.cleanup(trans, now);
275         groupStats.cleanup(trans, now);
276         meterConfigStats.cleanup(trans, now);
277         meterStats.cleanup(trans, now);
278         nodeConnectorStats.cleanup(trans, now);
279         queueStats.cleanup(trans, now);
280         msgManager.cleanStaleTransactionIds();
281
282         trans.commit();
283     }
284
285     public synchronized void requestPeriodicStatistics() {
286         logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
287
288         if (flowTableStats != null){
289             registerTransaction(flowTableStats.request(), StatsRequestType.ALL_FLOW);
290         }
291         if (flowStats != null){
292             // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
293             //        comes back -- we do not have any tables anyway.
294             final Collection<TableKey> tables = flowTableStats.getTables();
295             logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
296             for (final TableKey key : tables) {
297                 logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), targetNodeKey);
298                 registerTableTransaction(flowStats.requestAggregateFlows(key),  key.getId());
299             }
300
301             registerTransaction(flowStats.requestAllFlowsAllTables(), StatsRequestType.ALL_FLOW);
302         }
303
304         if (nodeConnectorStats != null) {
305             registerTransaction(nodeConnectorStats.request(), StatsRequestType.ALL_PORT);
306         }
307
308         if (groupStats != null) {
309             registerTransaction(groupStats.request(), StatsRequestType.ALL_GROUP);
310         }
311         sendGroupDescriptionRequest();
312
313         if (meterStats != null) {
314             registerTransaction(meterStats.request(), StatsRequestType.ALL_METER);
315         }
316         sendMeterConfigStatisticsRequest();
317
318         if(queueStats != null) {
319             registerTransaction(queueStats.request(), StatsRequestType.ALL_QUEUE_STATS);
320         }
321     }
322
323     public synchronized void start() {
324         requestPeriodicStatistics();
325     }
326
327     @Override
328     public synchronized void close() {
329         // FIXME: cleanup any resources we hold (registrations, etc.)
330         logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
331     }
332
333     synchronized void sendFlowStatsFromTableRequest(Flow flow) {
334         if (flowStats == null) {
335             logger.debug("No Flow statistics service, not sending a request");
336             return;
337         }
338
339         registerTransaction(flowStats.requestFlow(flow), StatsRequestType.ALL_FLOW);
340     }
341
342     synchronized void sendGroupDescriptionRequest() {
343         if (groupStats == null) {
344             logger.debug("No Group Descriptor statistics service, not sending a request");
345             return;
346         }
347
348         registerTransaction(groupDescStats.request(), StatsRequestType.GROUP_DESC);
349     }
350
351     synchronized void sendMeterConfigStatisticsRequest() {
352         if (meterConfigStats == null) {
353             logger.debug("No Meter Config statistics service, not sending a request");
354             return;
355         }
356
357         registerTransaction(meterConfigStats.request(), StatsRequestType.METER_CONFIG);
358     }
359
360     synchronized void sendQueueStatsFromGivenNodeConnector(NodeConnectorId nodeConnectorId, QueueId queueId) {
361         if (queueStats == null) {
362             logger.debug("No Queue statistics service, not sending a request");
363             return;
364         }
365
366         registerTransaction(queueStats.request(nodeConnectorId, queueId), StatsRequestType.ALL_QUEUE_STATS);
367     }
368
369     private void registerTransaction(final ListenableFuture<TransactionId> future, final StatsRequestType type) {
370         Futures.addCallback(future, new FutureCallback<TransactionId>() {
371             @Override
372             public void onSuccess(TransactionId result) {
373                 msgManager.recordExpectedTransaction(result, type);
374                 logger.debug("Transaction {} for node {} sent successfully", result, targetNodeKey);
375             }
376
377             @Override
378             public void onFailure(Throwable t) {
379                 logger.warn("Failed to send statistics request for node {}", targetNodeKey, t);
380             }
381         });
382     }
383
384     private void registerTableTransaction(final ListenableFuture<TransactionId> future, final Short id) {
385         Futures.addCallback(future, new FutureCallback<TransactionId>() {
386             @Override
387             public void onSuccess(TransactionId result) {
388                 msgManager.recordExpectedTableTransaction(result, StatsRequestType.AGGR_FLOW, id);
389                 logger.debug("Transaction {} for node {} table {} sent successfully", result, targetNodeKey, id);
390             }
391
392             @Override
393             public void onFailure(Throwable t) {
394                 logger.warn("Failed to send table statistics request for node {} table {}", targetNodeKey, id, t);
395             }
396         });
397     }
398 }