Keep track of the tables we discover
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / NodeStatisticsHandler.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Map.Entry;
18 import java.util.Set;
19 import java.util.concurrent.TimeUnit;
20
21 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStatsBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.desc.GroupDescBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStatsBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatisticsBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterConfigStatsBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterFeatures;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsDataBuilder;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.flow.capable.node.connector.queue.statistics.FlowCapableNodeConnectorQueueStatisticsBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
96 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
97 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
98 import org.slf4j.Logger;
99 import org.slf4j.LoggerFactory;
100
101 import com.google.common.base.Preconditions;
102
103 /**
104  * This class handles the lifecycle of per-node statistics. It receives data
105  * from StatisticsListener, stores it in the data store and keeps track of
106  * when the data should be removed.
107  *
108  * @author avishnoi@in.ibm.com
109  */
110 public class NodeStatisticsHandler implements AutoCloseable {
111     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
112     private static final int NUMBER_OF_WAIT_CYCLES = 2;
113
114     private final Map<GroupDescStats,Long> groupDescStatsUpdate = new HashMap<>();
115     private final Map<MeterConfigStats,Long> meterConfigStatsUpdate = new HashMap<>();
116     private final Map<FlowEntry,Long> flowStatsUpdate = new HashMap<>();
117     private final Map<QueueEntry,Long> queuesStatsUpdate = new HashMap<>();
118     private final InstanceIdentifier<Node> targetNodeIdentifier;
119     private final StatisticsProvider statisticsProvider;
120     private final NodeKey targetNodeKey;
121     private Collection<TableKey> knownTables = Collections.emptySet();
122     private int unaccountedFlowsCounter = 1;
123
124     public NodeStatisticsHandler(StatisticsProvider statisticsProvider, NodeKey nodeKey){
125         this.statisticsProvider = Preconditions.checkNotNull(statisticsProvider);
126         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
127         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
128     }
129
130     private static class FlowEntry {
131         private final Short tableId;
132         private final Flow flow;
133
134         public FlowEntry(Short tableId, Flow flow){
135             this.tableId = tableId;
136             this.flow = flow;
137         }
138
139         public Short getTableId() {
140             return tableId;
141         }
142
143         public Flow getFlow() {
144             return flow;
145         }
146
147         @Override
148         public int hashCode() {
149             final int prime = 31;
150             int result = 1;
151             result = prime * result + ((flow == null) ? 0 : flow.hashCode());
152             result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
153             return result;
154         }
155
156         @Override
157         public boolean equals(Object obj) {
158             if (this == obj)
159                 return true;
160             if (obj == null)
161                 return false;
162             if (getClass() != obj.getClass())
163                 return false;
164             FlowEntry other = (FlowEntry) obj;
165             if (flow == null) {
166                 if (other.flow != null)
167                     return false;
168             } else if (!flow.equals(other.flow))
169                 return false;
170             if (tableId == null) {
171                 if (other.tableId != null)
172                     return false;
173             } else if (!tableId.equals(other.tableId))
174                 return false;
175             return true;
176         }
177     }
178
179     private static final class QueueEntry {
180         private final NodeConnectorId nodeConnectorId;
181         private final QueueId queueId;
182         public QueueEntry(NodeConnectorId ncId, QueueId queueId){
183             this.nodeConnectorId = ncId;
184             this.queueId = queueId;
185         }
186         public NodeConnectorId getNodeConnectorId() {
187             return nodeConnectorId;
188         }
189         public QueueId getQueueId() {
190             return queueId;
191         }
192         @Override
193         public int hashCode() {
194             final int prime = 31;
195             int result = 1;
196             result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode());
197             result = prime * result + ((queueId == null) ? 0 : queueId.hashCode());
198             return result;
199         }
200         @Override
201         public boolean equals(Object obj) {
202             if (this == obj) {
203                 return true;
204             }
205             if (obj == null) {
206                 return false;
207             }
208             if (!(obj instanceof QueueEntry)) {
209                 return false;
210             }
211             QueueEntry other = (QueueEntry) obj;
212             if (nodeConnectorId == null) {
213                 if (other.nodeConnectorId != null) {
214                     return false;
215                 }
216             } else if (!nodeConnectorId.equals(other.nodeConnectorId)) {
217                 return false;
218             }
219             if (queueId == null) {
220                 if (other.queueId != null) {
221                     return false;
222                 }
223             } else if (!queueId.equals(other.queueId)) {
224                 return false;
225             }
226             return true;
227         }
228     }
229
230     public NodeKey getTargetNodeKey() {
231         return targetNodeKey;
232     }
233
234     public Collection<TableKey> getKnownTables() {
235         return knownTables;
236     }
237
238     public synchronized void updateGroupDescStats(List<GroupDescStats> list){
239         final Long expiryTime = getExpiryTime();
240         final DataModificationTransaction trans = statisticsProvider.startChange();
241
242         for (GroupDescStats groupDescStats : list) {
243             GroupBuilder groupBuilder = new GroupBuilder();
244             GroupKey groupKey = new GroupKey(groupDescStats.getGroupId());
245             groupBuilder.setKey(groupKey);
246
247             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
248                                                                                         .augmentation(FlowCapableNode.class)
249                                                                                         .child(Group.class,groupKey).toInstance();
250
251             NodeGroupDescStatsBuilder groupDesc= new NodeGroupDescStatsBuilder();
252             GroupDescBuilder stats = new GroupDescBuilder();
253             stats.fieldsFrom(groupDescStats);
254             groupDesc.setGroupDesc(stats.build());
255
256             //Update augmented data
257             groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
258
259             trans.putOperationalData(groupRef, groupBuilder.build());
260             this.groupDescStatsUpdate.put(groupDescStats, expiryTime);
261         }
262
263         trans.commit();
264     }
265
266     public synchronized void updateGroupStats(List<GroupStats> list) {
267         final DataModificationTransaction trans = statisticsProvider.startChange();
268
269         for(GroupStats groupStats : list) {
270             GroupBuilder groupBuilder = new GroupBuilder();
271             GroupKey groupKey = new GroupKey(groupStats.getGroupId());
272             groupBuilder.setKey(groupKey);
273
274             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
275                                                                                         .augmentation(FlowCapableNode.class)
276                                                                                         .child(Group.class,groupKey).toInstance();
277
278             NodeGroupStatisticsBuilder groupStatisticsBuilder= new NodeGroupStatisticsBuilder();
279             GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
280             stats.fieldsFrom(groupStats);
281             groupStatisticsBuilder.setGroupStatistics(stats.build());
282
283             //Update augmented data
284             groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
285             trans.putOperationalData(groupRef, groupBuilder.build());
286
287             // FIXME: should we be tracking this data?
288         }
289
290         trans.commit();
291     }
292
293     public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
294         final Long expiryTime = getExpiryTime();
295         final DataModificationTransaction trans = statisticsProvider.startChange();
296
297         for(MeterConfigStats meterConfigStats : list) {
298             MeterBuilder meterBuilder = new MeterBuilder();
299             MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId());
300             meterBuilder.setKey(meterKey);
301
302             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
303                                                                                         .augmentation(FlowCapableNode.class)
304                                                                                         .child(Meter.class,meterKey).toInstance();
305
306             NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
307             MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
308             stats.fieldsFrom(meterConfigStats);
309             meterConfig.setMeterConfigStats(stats.build());
310
311             //Update augmented data
312             meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
313
314             trans.putOperationalData(meterRef, meterBuilder.build());
315             this.meterConfigStatsUpdate.put(meterConfigStats, expiryTime);
316         }
317
318         trans.commit();
319     }
320
321
322     public synchronized void updateMeterStats(List<MeterStats> list) {
323         final DataModificationTransaction trans = statisticsProvider.startChange();
324
325         for(MeterStats meterStats : list) {
326             MeterBuilder meterBuilder = new MeterBuilder();
327             MeterKey meterKey = new MeterKey(meterStats.getMeterId());
328             meterBuilder.setKey(meterKey);
329
330             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
331                                                                                         .augmentation(FlowCapableNode.class)
332                                                                                         .child(Meter.class,meterKey).toInstance();
333
334             NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
335             MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
336             stats.fieldsFrom(meterStats);
337             meterStatsBuilder.setMeterStatistics(stats.build());
338
339             //Update augmented data
340             meterBuilder.addAugmentation(NodeMeterStatistics.class, meterStatsBuilder.build());
341             trans.putOperationalData(meterRef, meterBuilder.build());
342
343             // FIXME: should we be tracking this data?
344         }
345
346         trans.commit();
347     }
348
349     public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
350         final Long expiryTime = getExpiryTime();
351         final DataModificationTransaction trans = statisticsProvider.startChange();
352
353         for (QueueIdAndStatisticsMap swQueueStats : list) {
354
355             QueueEntry queueEntry = new QueueEntry(swQueueStats.getNodeConnectorId(),swQueueStats.getQueueId());
356
357             FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
358
359             FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
360
361             queueStatisticsBuilder.fieldsFrom(swQueueStats);
362
363             queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
364
365             InstanceIdentifier<Queue> queueRef
366                     = InstanceIdentifier.builder(Nodes.class)
367                                         .child(Node.class, targetNodeKey)
368                                         .child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId()))
369                                         .augmentation(FlowCapableNodeConnector.class)
370                                         .child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance();
371
372             QueueBuilder queueBuilder = new QueueBuilder();
373             FlowCapableNodeConnectorQueueStatisticsData qsd = queueStatisticsDataBuilder.build();
374             queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, qsd);
375             queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
376
377             logger.debug("Augmenting queue statistics {} of queue {} to port {}",
378                                         qsd,
379                                         swQueueStats.getQueueId(),
380                                         swQueueStats.getNodeConnectorId());
381
382             trans.putOperationalData(queueRef, queueBuilder.build());
383             this.queuesStatsUpdate.put(queueEntry, expiryTime);
384         }
385
386         trans.commit();
387     }
388
389     public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
390         final DataModificationTransaction trans = statisticsProvider.startChange();
391
392         final Set<TableKey> knownTables = new HashSet<>(list.size());
393         for (FlowTableAndStatisticsMap ftStats : list) {
394
395             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
396                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance();
397
398             FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
399             final FlowTableStatistics stats = new FlowTableStatisticsBuilder(ftStats).build();
400             statisticsDataBuilder.setFlowTableStatistics(stats);
401
402             logger.debug("Augment flow table statistics: {} for table {} on Node {}",
403                     stats,ftStats.getTableId(), targetNodeKey);
404
405             TableBuilder tableBuilder = new TableBuilder();
406             tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue()));
407             tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
408             trans.putOperationalData(tableRef, tableBuilder.build());
409
410             knownTables.add(tableBuilder.getKey());
411         }
412
413         this.knownTables = Collections.unmodifiableCollection(knownTables);
414         trans.commit();
415     }
416
417     public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
418         final DataModificationTransaction trans = statisticsProvider.startChange();
419
420         for(NodeConnectorStatisticsAndPortNumberMap portStats : list) {
421
422             FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
423                                             = new FlowCapableNodeConnectorStatisticsBuilder();
424             statisticsBuilder.setBytes(portStats.getBytes());
425             statisticsBuilder.setCollisionCount(portStats.getCollisionCount());
426             statisticsBuilder.setDuration(portStats.getDuration());
427             statisticsBuilder.setPackets(portStats.getPackets());
428             statisticsBuilder.setReceiveCrcError(portStats.getReceiveCrcError());
429             statisticsBuilder.setReceiveDrops(portStats.getReceiveDrops());
430             statisticsBuilder.setReceiveErrors(portStats.getReceiveErrors());
431             statisticsBuilder.setReceiveFrameError(portStats.getReceiveFrameError());
432             statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError());
433             statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops());
434             statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors());
435
436             //Augment data to the node-connector
437             FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
438                     new FlowCapableNodeConnectorStatisticsDataBuilder();
439
440             statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
441
442             InstanceIdentifier<NodeConnector> nodeConnectorRef = InstanceIdentifier.builder(Nodes.class)
443                     .child(Node.class, targetNodeKey)
444                     .child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance();
445
446             // FIXME: can we bypass this read?
447             NodeConnector nodeConnector = (NodeConnector)trans.readOperationalData(nodeConnectorRef);
448             if(nodeConnector != null){
449                 final FlowCapableNodeConnectorStatisticsData stats = statisticsDataBuilder.build();
450                 logger.debug("Augmenting port statistics {} to port {}",stats,nodeConnectorRef.toString());
451                 NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
452                 nodeConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, stats);
453                 trans.putOperationalData(nodeConnectorRef, nodeConnectorBuilder.build());
454             }
455
456             // FIXME: should we be tracking this data?
457         }
458
459         trans.commit();
460     }
461
462     public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
463         if (tableId != null) {
464             final DataModificationTransaction trans = statisticsProvider.startChange();
465
466
467             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
468                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
469
470             AggregateFlowStatisticsDataBuilder aggregateFlowStatisticsDataBuilder = new AggregateFlowStatisticsDataBuilder();
471             AggregateFlowStatisticsBuilder aggregateFlowStatisticsBuilder = new AggregateFlowStatisticsBuilder(flowStats);
472
473             aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
474
475             logger.debug("Augment aggregate statistics: {} for table {} on Node {}",
476                     aggregateFlowStatisticsBuilder.build().toString(),tableId,targetNodeKey);
477
478             TableBuilder tableBuilder = new TableBuilder();
479             tableBuilder.setKey(new TableKey(tableId));
480             tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
481             trans.putOperationalData(tableRef, tableBuilder.build());
482
483             // FIXME: should we be tracking this data?
484             trans.commit();
485         }
486     }
487
488     public synchronized void updateGroupFeatures(GroupFeatures notification) {
489         final DataModificationTransaction trans = statisticsProvider.startChange();
490
491         final NodeBuilder nodeData = new NodeBuilder();
492         nodeData.setKey(targetNodeKey);
493
494         NodeGroupFeaturesBuilder nodeGroupFeatures = new NodeGroupFeaturesBuilder();
495         GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder(notification);
496         nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
497
498         //Update augmented data
499         nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
500         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
501
502         // FIXME: should we be tracking this data?
503         trans.commit();
504     }
505
506     public synchronized void updateMeterFeatures(MeterFeatures features) {
507         final DataModificationTransaction trans = statisticsProvider.startChange();
508
509         final NodeBuilder nodeData = new NodeBuilder();
510         nodeData.setKey(targetNodeKey);
511
512         NodeMeterFeaturesBuilder nodeMeterFeatures = new NodeMeterFeaturesBuilder();
513         MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder(features);
514         nodeMeterFeatures.setMeterFeatures(meterFeature.build());
515
516         //Update augmented data
517         nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
518         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
519
520         // FIXME: should we be tracking this data?
521         trans.commit();
522     }
523
524     public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
525         final Long expiryTime = getExpiryTime();
526         final DataModificationTransaction trans = statisticsProvider.startChange();
527
528         for(FlowAndStatisticsMapList map : list) {
529             short tableId = map.getTableId();
530             boolean foundOriginalFlow = false;
531
532             FlowBuilder flowBuilder = new FlowBuilder();
533
534             FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
535
536             FlowBuilder flow = new FlowBuilder();
537             flow.setContainerName(map.getContainerName());
538             flow.setBufferId(map.getBufferId());
539             flow.setCookie(map.getCookie());
540             flow.setCookieMask(map.getCookieMask());
541             flow.setFlags(map.getFlags());
542             flow.setFlowName(map.getFlowName());
543             flow.setHardTimeout(map.getHardTimeout());
544             if(map.getFlowId() != null)
545                 flow.setId(new FlowId(map.getFlowId().getValue()));
546             flow.setIdleTimeout(map.getIdleTimeout());
547             flow.setInstallHw(map.isInstallHw());
548             flow.setInstructions(map.getInstructions());
549             if(map.getFlowId()!= null)
550                 flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
551             flow.setMatch(map.getMatch());
552             flow.setOutGroup(map.getOutGroup());
553             flow.setOutPort(map.getOutPort());
554             flow.setPriority(map.getPriority());
555             flow.setStrict(map.isStrict());
556             flow.setTableId(tableId);
557
558             Flow flowRule = flow.build();
559
560             FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
561             stats.setByteCount(map.getByteCount());
562             stats.setPacketCount(map.getPacketCount());
563             stats.setDuration(map.getDuration());
564
565             GenericStatistics flowStats = stats.build();
566
567             //Augment the data to the flow node
568
569             FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
570             flowStatistics.setByteCount(flowStats.getByteCount());
571             flowStatistics.setPacketCount(flowStats.getPacketCount());
572             flowStatistics.setDuration(flowStats.getDuration());
573             flowStatistics.setContainerName(map.getContainerName());
574             flowStatistics.setBufferId(map.getBufferId());
575             flowStatistics.setCookie(map.getCookie());
576             flowStatistics.setCookieMask(map.getCookieMask());
577             flowStatistics.setFlags(map.getFlags());
578             flowStatistics.setFlowName(map.getFlowName());
579             flowStatistics.setHardTimeout(map.getHardTimeout());
580             flowStatistics.setIdleTimeout(map.getIdleTimeout());
581             flowStatistics.setInstallHw(map.isInstallHw());
582             flowStatistics.setInstructions(map.getInstructions());
583             flowStatistics.setMatch(map.getMatch());
584             flowStatistics.setOutGroup(map.getOutGroup());
585             flowStatistics.setOutPort(map.getOutPort());
586             flowStatistics.setPriority(map.getPriority());
587             flowStatistics.setStrict(map.isStrict());
588             flowStatistics.setTableId(tableId);
589
590             flowStatisticsData.setFlowStatistics(flowStatistics.build());
591
592             logger.debug("Flow : {}",flowRule.toString());
593             logger.debug("Statistics to augment : {}",flowStatistics.build().toString());
594
595             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
596                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
597
598             Table table= (Table)trans.readConfigurationData(tableRef);
599
600             //TODO: Not a good way to do it, need to figure out better way.
601             //TODO: major issue in any alternate approach is that flow key is incrementally assigned
602             //to the flows stored in data store.
603             // Augment same statistics to all the matching masked flow
604             if(table != null){
605
606                 for(Flow existingFlow : table.getFlow()){
607                     logger.debug("Existing flow in data store : {}",existingFlow.toString());
608                     if(FlowComparator.flowEquals(flowRule,existingFlow)){
609                         InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
610                                 .augmentation(FlowCapableNode.class)
611                                 .child(Table.class, new TableKey(tableId))
612                                 .child(Flow.class,existingFlow.getKey()).toInstance();
613                         flowBuilder.setKey(existingFlow.getKey());
614                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
615                         logger.debug("Found matching flow in the datastore, augmenting statistics");
616                         foundOriginalFlow = true;
617                         // Update entry with timestamp of latest response
618                         flow.setKey(existingFlow.getKey());
619                         FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
620                         flowStatsUpdate.put(flowStatsEntry, expiryTime);
621
622                         trans.putOperationalData(flowRef, flowBuilder.build());
623                     }
624                 }
625             }
626
627             table = (Table)trans.readOperationalData(tableRef);
628             if(!foundOriginalFlow && table != null){
629
630                 for(Flow existingFlow : table.getFlow()){
631                     FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
632                     if(augmentedflowStatisticsData != null){
633                         FlowBuilder existingOperationalFlow = new FlowBuilder();
634                         existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
635                         logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
636                         if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
637                             InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
638                                     .augmentation(FlowCapableNode.class)
639                                     .child(Table.class, new TableKey(tableId))
640                                     .child(Flow.class,existingFlow.getKey()).toInstance();
641                             flowBuilder.setKey(existingFlow.getKey());
642                             flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
643                             logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
644                             foundOriginalFlow = true;
645
646                             // Update entry with timestamp of latest response
647                             flow.setKey(existingFlow.getKey());
648                             FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
649                             flowStatsUpdate.put(flowStatsEntry, expiryTime);
650                             trans.putOperationalData(flowRef, flowBuilder.build());
651                             break;
652                         }
653                     }
654                 }
655             }
656             if(!foundOriginalFlow){
657                 String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
658                 this.unaccountedFlowsCounter++;
659                 FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
660                 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
661                         .augmentation(FlowCapableNode.class)
662                         .child(Table.class, new TableKey(tableId))
663                         .child(Flow.class,newFlowKey).toInstance();
664                 flowBuilder.setKey(newFlowKey);
665                 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
666                 logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",
667                         flowBuilder.build());
668
669                 // Update entry with timestamp of latest response
670                 flow.setKey(newFlowKey);
671                 FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
672                 flowStatsUpdate.put(flowStatsEntry, expiryTime);
673                 trans.putOperationalData(flowRef, flowBuilder.build());
674             }
675         }
676
677         trans.commit();
678     }
679
680     private static Long getExpiryTime(){
681         final long now = System.nanoTime();
682         return now + TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_THREAD_EXECUTION_TIME * NUMBER_OF_WAIT_CYCLES);
683     }
684
685     public synchronized void cleanStaleStatistics(){
686         final DataModificationTransaction trans = this.statisticsProvider.startChange();
687         final long now = System.nanoTime();
688
689         //Clean stale statistics related to group
690         for (Iterator<Entry<GroupDescStats, Long>> it = this.groupDescStatsUpdate.entrySet().iterator();it.hasNext();){
691             Entry<GroupDescStats, Long> e = it.next();
692             if (now > e.getValue()) {
693                 cleanGroupStatsFromDataStore(trans, e.getKey());
694                 it.remove();
695             }
696         }
697
698         //Clean stale statistics related to meter
699         for (Iterator<Entry<MeterConfigStats, Long>> it = this.meterConfigStatsUpdate.entrySet().iterator();it.hasNext();){
700             Entry<MeterConfigStats, Long> e = it.next();
701             if (now > e.getValue()) {
702                 cleanMeterStatsFromDataStore(trans, e.getKey());
703                 it.remove();
704             }
705         }
706
707         //Clean stale statistics related to flow
708         for (Iterator<Entry<FlowEntry, Long>> it = this.flowStatsUpdate.entrySet().iterator();it.hasNext();){
709             Entry<FlowEntry, Long> e = it.next();
710             if (now > e.getValue()) {
711                 cleanFlowStatsFromDataStore(trans, e.getKey());
712                 it.remove();
713             }
714         }
715
716         //Clean stale statistics related to queue
717         for (Iterator<Entry<QueueEntry, Long>> it = this.queuesStatsUpdate.entrySet().iterator();it.hasNext();){
718             Entry<QueueEntry, Long> e = it.next();
719             if (now > e.getValue()) {
720                 cleanQueueStatsFromDataStore(trans, e.getKey());
721                 it.remove();
722             }
723         }
724
725         trans.commit();
726     }
727
728     private void cleanQueueStatsFromDataStore(DataModificationTransaction trans, QueueEntry queueEntry) {
729         InstanceIdentifier<?> queueRef
730                         = InstanceIdentifier.builder(Nodes.class)
731                                             .child(Node.class, this.targetNodeKey)
732                                             .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
733                                             .augmentation(FlowCapableNodeConnector.class)
734                                             .child(Queue.class, new QueueKey(queueEntry.getQueueId()))
735                                             .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
736         trans.removeOperationalData(queueRef);
737     }
738
739     private void cleanFlowStatsFromDataStore(DataModificationTransaction trans, FlowEntry flowEntry) {
740         InstanceIdentifier<?> flowRef
741                         = InstanceIdentifier.builder(Nodes.class).child(Node.class, this.targetNodeKey)
742                                             .augmentation(FlowCapableNode.class)
743                                             .child(Table.class, new TableKey(flowEntry.getTableId()))
744                                             .child(Flow.class,flowEntry.getFlow().getKey())
745                                             .augmentation(FlowStatisticsData.class).toInstance();
746         trans.removeOperationalData(flowRef);
747     }
748
749     private void cleanMeterStatsFromDataStore(DataModificationTransaction trans, MeterConfigStats meterConfigStats) {
750         InstanceIdentifierBuilder<Meter> meterRef
751                         = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
752                                             .augmentation(FlowCapableNode.class)
753                                             .child(Meter.class,new MeterKey(meterConfigStats.getMeterId()));
754
755         InstanceIdentifier<?> nodeMeterConfigStatsAugmentation = meterRef.augmentation(NodeMeterConfigStats.class).toInstance();
756         trans.removeOperationalData(nodeMeterConfigStatsAugmentation);
757
758         InstanceIdentifier<?> nodeMeterStatisticsAugmentation = meterRef.augmentation(NodeMeterStatistics.class).toInstance();
759         trans.removeOperationalData(nodeMeterStatisticsAugmentation);
760     }
761
762     private void cleanGroupStatsFromDataStore(DataModificationTransaction trans, GroupDescStats groupDescStats) {
763         InstanceIdentifierBuilder<Group> groupRef
764                         = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
765                                             .augmentation(FlowCapableNode.class)
766                                             .child(Group.class,new GroupKey(groupDescStats.getGroupId()));
767
768         InstanceIdentifier<?> nodeGroupDescStatsAugmentation = groupRef.augmentation(NodeGroupDescStats.class).toInstance();
769         trans.removeOperationalData(nodeGroupDescStatsAugmentation);
770
771         InstanceIdentifier<?> nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance();
772         trans.removeOperationalData(nodeGroupStatisticsAugmentation);
773     }
774
775     @Override
776     public void close() {
777         // FIXME: cleanup any resources we hold (registrations, etc.)
778         logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
779     }
780 }