Centralize targetNodeRef in the NodeStatisticsHandler
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / NodeStatisticsHandler.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Map.Entry;
18 import java.util.Set;
19 import java.util.concurrent.TimeUnit;
20
21 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
22 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStatsBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.desc.GroupDescBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStatsBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatisticsBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterConfigStatsBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterFeatures;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsDataBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.flow.capable.node.connector.queue.statistics.FlowCapableNodeConnectorQueueStatisticsBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
98 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
99 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
100 import org.slf4j.Logger;
101 import org.slf4j.LoggerFactory;
102
103 import com.google.common.base.Preconditions;
104
105 /**
106  * This class handles the lifecycle of per-node statistics. It receives data
107  * from StatisticsListener, stores it in the data store and keeps track of
108  * when the data should be removed.
109  *
110  * @author avishnoi@in.ibm.com
111  */
112 public final class NodeStatisticsHandler implements AutoCloseable {
113     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
114     private static final int NUMBER_OF_WAIT_CYCLES = 2;
115
116     private final Map<GroupDescStats,Long> groupDescStatsUpdate = new HashMap<>();
117     private final Map<MeterConfigStats,Long> meterConfigStatsUpdate = new HashMap<>();
118     private final Map<FlowEntry,Long> flowStatsUpdate = new HashMap<>();
119     private final Map<QueueEntry,Long> queuesStatsUpdate = new HashMap<>();
120     private final InstanceIdentifier<Node> targetNodeIdentifier;
121     private final DataProviderService dps;
122     private final NodeRef targetNodeRef;
123     private final NodeKey targetNodeKey;
124     private Collection<TableKey> knownTables = Collections.emptySet();
125     private int unaccountedFlowsCounter = 1;
126
127     public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey) {
128         this.dps = Preconditions.checkNotNull(dps);
129         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
130         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
131         this.targetNodeRef = new NodeRef(targetNodeIdentifier);
132     }
133
134     private static class FlowEntry {
135         private final Short tableId;
136         private final Flow flow;
137
138         public FlowEntry(Short tableId, Flow flow){
139             this.tableId = tableId;
140             this.flow = flow;
141         }
142
143         public Short getTableId() {
144             return tableId;
145         }
146
147         public Flow getFlow() {
148             return flow;
149         }
150
151         @Override
152         public int hashCode() {
153             final int prime = 31;
154             int result = 1;
155             result = prime * result + ((flow == null) ? 0 : flow.hashCode());
156             result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
157             return result;
158         }
159
160         @Override
161         public boolean equals(Object obj) {
162             if (this == obj)
163                 return true;
164             if (obj == null)
165                 return false;
166             if (getClass() != obj.getClass())
167                 return false;
168             FlowEntry other = (FlowEntry) obj;
169             if (flow == null) {
170                 if (other.flow != null)
171                     return false;
172             } else if (!flow.equals(other.flow))
173                 return false;
174             if (tableId == null) {
175                 if (other.tableId != null)
176                     return false;
177             } else if (!tableId.equals(other.tableId))
178                 return false;
179             return true;
180         }
181     }
182
183     private static final class QueueEntry {
184         private final NodeConnectorId nodeConnectorId;
185         private final QueueId queueId;
186         public QueueEntry(NodeConnectorId ncId, QueueId queueId){
187             this.nodeConnectorId = ncId;
188             this.queueId = queueId;
189         }
190         public NodeConnectorId getNodeConnectorId() {
191             return nodeConnectorId;
192         }
193         public QueueId getQueueId() {
194             return queueId;
195         }
196         @Override
197         public int hashCode() {
198             final int prime = 31;
199             int result = 1;
200             result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode());
201             result = prime * result + ((queueId == null) ? 0 : queueId.hashCode());
202             return result;
203         }
204         @Override
205         public boolean equals(Object obj) {
206             if (this == obj) {
207                 return true;
208             }
209             if (obj == null) {
210                 return false;
211             }
212             if (!(obj instanceof QueueEntry)) {
213                 return false;
214             }
215             QueueEntry other = (QueueEntry) obj;
216             if (nodeConnectorId == null) {
217                 if (other.nodeConnectorId != null) {
218                     return false;
219                 }
220             } else if (!nodeConnectorId.equals(other.nodeConnectorId)) {
221                 return false;
222             }
223             if (queueId == null) {
224                 if (other.queueId != null) {
225                     return false;
226                 }
227             } else if (!queueId.equals(other.queueId)) {
228                 return false;
229             }
230             return true;
231         }
232     }
233
234     public NodeKey getTargetNodeKey() {
235         return targetNodeKey;
236     }
237
238     public Collection<TableKey> getKnownTables() {
239         return knownTables;
240     }
241
242     public InstanceIdentifier<Node> getTargetNodeIdentifier() {
243         return targetNodeIdentifier;
244     }
245
246     public NodeRef getTargetNodeRef() {
247         return targetNodeRef;
248     }
249
250     public synchronized void updateGroupDescStats(List<GroupDescStats> list){
251         final Long expiryTime = getExpiryTime();
252         final DataModificationTransaction trans = dps.beginTransaction();
253
254         for (GroupDescStats groupDescStats : list) {
255             GroupBuilder groupBuilder = new GroupBuilder();
256             GroupKey groupKey = new GroupKey(groupDescStats.getGroupId());
257             groupBuilder.setKey(groupKey);
258
259             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
260                                                                                         .augmentation(FlowCapableNode.class)
261                                                                                         .child(Group.class,groupKey).toInstance();
262
263             NodeGroupDescStatsBuilder groupDesc= new NodeGroupDescStatsBuilder();
264             GroupDescBuilder stats = new GroupDescBuilder();
265             stats.fieldsFrom(groupDescStats);
266             groupDesc.setGroupDesc(stats.build());
267
268             //Update augmented data
269             groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
270
271             trans.putOperationalData(groupRef, groupBuilder.build());
272             this.groupDescStatsUpdate.put(groupDescStats, expiryTime);
273         }
274
275         trans.commit();
276     }
277
278     public synchronized void updateGroupStats(List<GroupStats> list) {
279         final DataModificationTransaction trans = dps.beginTransaction();
280
281         for(GroupStats groupStats : list) {
282             GroupBuilder groupBuilder = new GroupBuilder();
283             GroupKey groupKey = new GroupKey(groupStats.getGroupId());
284             groupBuilder.setKey(groupKey);
285
286             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
287                                                                                         .augmentation(FlowCapableNode.class)
288                                                                                         .child(Group.class,groupKey).toInstance();
289
290             NodeGroupStatisticsBuilder groupStatisticsBuilder= new NodeGroupStatisticsBuilder();
291             GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
292             stats.fieldsFrom(groupStats);
293             groupStatisticsBuilder.setGroupStatistics(stats.build());
294
295             //Update augmented data
296             groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
297             trans.putOperationalData(groupRef, groupBuilder.build());
298
299             // FIXME: should we be tracking this data?
300         }
301
302         trans.commit();
303     }
304
305     public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
306         final Long expiryTime = getExpiryTime();
307         final DataModificationTransaction trans = dps.beginTransaction();
308
309         for(MeterConfigStats meterConfigStats : list) {
310             MeterBuilder meterBuilder = new MeterBuilder();
311             MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId());
312             meterBuilder.setKey(meterKey);
313
314             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
315                                                                                         .augmentation(FlowCapableNode.class)
316                                                                                         .child(Meter.class,meterKey).toInstance();
317
318             NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
319             MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
320             stats.fieldsFrom(meterConfigStats);
321             meterConfig.setMeterConfigStats(stats.build());
322
323             //Update augmented data
324             meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
325
326             trans.putOperationalData(meterRef, meterBuilder.build());
327             this.meterConfigStatsUpdate.put(meterConfigStats, expiryTime);
328         }
329
330         trans.commit();
331     }
332
333
334     public synchronized void updateMeterStats(List<MeterStats> list) {
335         final DataModificationTransaction trans = dps.beginTransaction();
336
337         for(MeterStats meterStats : list) {
338             MeterBuilder meterBuilder = new MeterBuilder();
339             MeterKey meterKey = new MeterKey(meterStats.getMeterId());
340             meterBuilder.setKey(meterKey);
341
342             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
343                                                                                         .augmentation(FlowCapableNode.class)
344                                                                                         .child(Meter.class,meterKey).toInstance();
345
346             NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
347             MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
348             stats.fieldsFrom(meterStats);
349             meterStatsBuilder.setMeterStatistics(stats.build());
350
351             //Update augmented data
352             meterBuilder.addAugmentation(NodeMeterStatistics.class, meterStatsBuilder.build());
353             trans.putOperationalData(meterRef, meterBuilder.build());
354
355             // FIXME: should we be tracking this data?
356         }
357
358         trans.commit();
359     }
360
361     public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
362         final Long expiryTime = getExpiryTime();
363         final DataModificationTransaction trans = dps.beginTransaction();
364
365         for (QueueIdAndStatisticsMap swQueueStats : list) {
366
367             QueueEntry queueEntry = new QueueEntry(swQueueStats.getNodeConnectorId(),swQueueStats.getQueueId());
368
369             FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
370
371             FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
372
373             queueStatisticsBuilder.fieldsFrom(swQueueStats);
374
375             queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
376
377             InstanceIdentifier<Queue> queueRef
378                     = InstanceIdentifier.builder(Nodes.class)
379                                         .child(Node.class, targetNodeKey)
380                                         .child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId()))
381                                         .augmentation(FlowCapableNodeConnector.class)
382                                         .child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance();
383
384             QueueBuilder queueBuilder = new QueueBuilder();
385             FlowCapableNodeConnectorQueueStatisticsData qsd = queueStatisticsDataBuilder.build();
386             queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, qsd);
387             queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
388
389             logger.debug("Augmenting queue statistics {} of queue {} to port {}",
390                                         qsd,
391                                         swQueueStats.getQueueId(),
392                                         swQueueStats.getNodeConnectorId());
393
394             trans.putOperationalData(queueRef, queueBuilder.build());
395             this.queuesStatsUpdate.put(queueEntry, expiryTime);
396         }
397
398         trans.commit();
399     }
400
401     public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
402         final DataModificationTransaction trans = dps.beginTransaction();
403
404         final Set<TableKey> knownTables = new HashSet<>(list.size());
405         for (FlowTableAndStatisticsMap ftStats : list) {
406
407             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
408                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance();
409
410             FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
411             final FlowTableStatistics stats = new FlowTableStatisticsBuilder(ftStats).build();
412             statisticsDataBuilder.setFlowTableStatistics(stats);
413
414             logger.debug("Augment flow table statistics: {} for table {} on Node {}",
415                     stats,ftStats.getTableId(), targetNodeKey);
416
417             TableBuilder tableBuilder = new TableBuilder();
418             tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue()));
419             tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
420             trans.putOperationalData(tableRef, tableBuilder.build());
421
422             knownTables.add(tableBuilder.getKey());
423         }
424
425         this.knownTables = Collections.unmodifiableCollection(knownTables);
426         trans.commit();
427     }
428
429     public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
430         final DataModificationTransaction trans = dps.beginTransaction();
431
432         for(NodeConnectorStatisticsAndPortNumberMap portStats : list) {
433
434             FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
435                                             = new FlowCapableNodeConnectorStatisticsBuilder();
436             statisticsBuilder.setBytes(portStats.getBytes());
437             statisticsBuilder.setCollisionCount(portStats.getCollisionCount());
438             statisticsBuilder.setDuration(portStats.getDuration());
439             statisticsBuilder.setPackets(portStats.getPackets());
440             statisticsBuilder.setReceiveCrcError(portStats.getReceiveCrcError());
441             statisticsBuilder.setReceiveDrops(portStats.getReceiveDrops());
442             statisticsBuilder.setReceiveErrors(portStats.getReceiveErrors());
443             statisticsBuilder.setReceiveFrameError(portStats.getReceiveFrameError());
444             statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError());
445             statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops());
446             statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors());
447
448             //Augment data to the node-connector
449             FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
450                     new FlowCapableNodeConnectorStatisticsDataBuilder();
451
452             statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
453
454             InstanceIdentifier<NodeConnector> nodeConnectorRef = InstanceIdentifier.builder(Nodes.class)
455                     .child(Node.class, targetNodeKey)
456                     .child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance();
457
458             // FIXME: can we bypass this read?
459             NodeConnector nodeConnector = (NodeConnector)trans.readOperationalData(nodeConnectorRef);
460             if(nodeConnector != null){
461                 final FlowCapableNodeConnectorStatisticsData stats = statisticsDataBuilder.build();
462                 logger.debug("Augmenting port statistics {} to port {}",stats,nodeConnectorRef.toString());
463                 NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
464                 nodeConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, stats);
465                 trans.putOperationalData(nodeConnectorRef, nodeConnectorBuilder.build());
466             }
467
468             // FIXME: should we be tracking this data?
469         }
470
471         trans.commit();
472     }
473
474     public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
475         if (tableId != null) {
476             final DataModificationTransaction trans = dps.beginTransaction();
477
478
479             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
480                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
481
482             AggregateFlowStatisticsDataBuilder aggregateFlowStatisticsDataBuilder = new AggregateFlowStatisticsDataBuilder();
483             AggregateFlowStatisticsBuilder aggregateFlowStatisticsBuilder = new AggregateFlowStatisticsBuilder(flowStats);
484
485             aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
486
487             logger.debug("Augment aggregate statistics: {} for table {} on Node {}",
488                     aggregateFlowStatisticsBuilder.build().toString(),tableId,targetNodeKey);
489
490             TableBuilder tableBuilder = new TableBuilder();
491             tableBuilder.setKey(new TableKey(tableId));
492             tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
493             trans.putOperationalData(tableRef, tableBuilder.build());
494
495             // FIXME: should we be tracking this data?
496             trans.commit();
497         }
498     }
499
500     public synchronized void updateGroupFeatures(GroupFeatures notification) {
501         final DataModificationTransaction trans = dps.beginTransaction();
502
503         final NodeBuilder nodeData = new NodeBuilder();
504         nodeData.setKey(targetNodeKey);
505
506         NodeGroupFeaturesBuilder nodeGroupFeatures = new NodeGroupFeaturesBuilder();
507         GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder(notification);
508         nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
509
510         //Update augmented data
511         nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
512         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
513
514         // FIXME: should we be tracking this data?
515         trans.commit();
516     }
517
518     public synchronized void updateMeterFeatures(MeterFeatures features) {
519         final DataModificationTransaction trans = dps.beginTransaction();
520
521         final NodeBuilder nodeData = new NodeBuilder();
522         nodeData.setKey(targetNodeKey);
523
524         NodeMeterFeaturesBuilder nodeMeterFeatures = new NodeMeterFeaturesBuilder();
525         MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder(features);
526         nodeMeterFeatures.setMeterFeatures(meterFeature.build());
527
528         //Update augmented data
529         nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
530         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
531
532         // FIXME: should we be tracking this data?
533         trans.commit();
534     }
535
536     public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
537         final Long expiryTime = getExpiryTime();
538         final DataModificationTransaction trans = dps.beginTransaction();
539
540         for(FlowAndStatisticsMapList map : list) {
541             short tableId = map.getTableId();
542             boolean foundOriginalFlow = false;
543
544             FlowBuilder flowBuilder = new FlowBuilder();
545
546             FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
547
548             FlowBuilder flow = new FlowBuilder();
549             flow.setContainerName(map.getContainerName());
550             flow.setBufferId(map.getBufferId());
551             flow.setCookie(map.getCookie());
552             flow.setCookieMask(map.getCookieMask());
553             flow.setFlags(map.getFlags());
554             flow.setFlowName(map.getFlowName());
555             flow.setHardTimeout(map.getHardTimeout());
556             if(map.getFlowId() != null)
557                 flow.setId(new FlowId(map.getFlowId().getValue()));
558             flow.setIdleTimeout(map.getIdleTimeout());
559             flow.setInstallHw(map.isInstallHw());
560             flow.setInstructions(map.getInstructions());
561             if(map.getFlowId()!= null)
562                 flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
563             flow.setMatch(map.getMatch());
564             flow.setOutGroup(map.getOutGroup());
565             flow.setOutPort(map.getOutPort());
566             flow.setPriority(map.getPriority());
567             flow.setStrict(map.isStrict());
568             flow.setTableId(tableId);
569
570             Flow flowRule = flow.build();
571
572             FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
573             stats.setByteCount(map.getByteCount());
574             stats.setPacketCount(map.getPacketCount());
575             stats.setDuration(map.getDuration());
576
577             GenericStatistics flowStats = stats.build();
578
579             //Augment the data to the flow node
580
581             FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
582             flowStatistics.setByteCount(flowStats.getByteCount());
583             flowStatistics.setPacketCount(flowStats.getPacketCount());
584             flowStatistics.setDuration(flowStats.getDuration());
585             flowStatistics.setContainerName(map.getContainerName());
586             flowStatistics.setBufferId(map.getBufferId());
587             flowStatistics.setCookie(map.getCookie());
588             flowStatistics.setCookieMask(map.getCookieMask());
589             flowStatistics.setFlags(map.getFlags());
590             flowStatistics.setFlowName(map.getFlowName());
591             flowStatistics.setHardTimeout(map.getHardTimeout());
592             flowStatistics.setIdleTimeout(map.getIdleTimeout());
593             flowStatistics.setInstallHw(map.isInstallHw());
594             flowStatistics.setInstructions(map.getInstructions());
595             flowStatistics.setMatch(map.getMatch());
596             flowStatistics.setOutGroup(map.getOutGroup());
597             flowStatistics.setOutPort(map.getOutPort());
598             flowStatistics.setPriority(map.getPriority());
599             flowStatistics.setStrict(map.isStrict());
600             flowStatistics.setTableId(tableId);
601
602             flowStatisticsData.setFlowStatistics(flowStatistics.build());
603
604             logger.debug("Flow : {}",flowRule.toString());
605             logger.debug("Statistics to augment : {}",flowStatistics.build().toString());
606
607             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
608                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
609
610             Table table= (Table)trans.readConfigurationData(tableRef);
611
612             //TODO: Not a good way to do it, need to figure out better way.
613             //TODO: major issue in any alternate approach is that flow key is incrementally assigned
614             //to the flows stored in data store.
615             // Augment same statistics to all the matching masked flow
616             if(table != null){
617
618                 for(Flow existingFlow : table.getFlow()){
619                     logger.debug("Existing flow in data store : {}",existingFlow.toString());
620                     if(FlowComparator.flowEquals(flowRule,existingFlow)){
621                         InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
622                                 .augmentation(FlowCapableNode.class)
623                                 .child(Table.class, new TableKey(tableId))
624                                 .child(Flow.class,existingFlow.getKey()).toInstance();
625                         flowBuilder.setKey(existingFlow.getKey());
626                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
627                         logger.debug("Found matching flow in the datastore, augmenting statistics");
628                         foundOriginalFlow = true;
629                         // Update entry with timestamp of latest response
630                         flow.setKey(existingFlow.getKey());
631                         FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
632                         flowStatsUpdate.put(flowStatsEntry, expiryTime);
633
634                         trans.putOperationalData(flowRef, flowBuilder.build());
635                     }
636                 }
637             }
638
639             table = (Table)trans.readOperationalData(tableRef);
640             if(!foundOriginalFlow && table != null){
641
642                 for(Flow existingFlow : table.getFlow()){
643                     FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
644                     if(augmentedflowStatisticsData != null){
645                         FlowBuilder existingOperationalFlow = new FlowBuilder();
646                         existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
647                         logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
648                         if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
649                             InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
650                                     .augmentation(FlowCapableNode.class)
651                                     .child(Table.class, new TableKey(tableId))
652                                     .child(Flow.class,existingFlow.getKey()).toInstance();
653                             flowBuilder.setKey(existingFlow.getKey());
654                             flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
655                             logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
656                             foundOriginalFlow = true;
657
658                             // Update entry with timestamp of latest response
659                             flow.setKey(existingFlow.getKey());
660                             FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
661                             flowStatsUpdate.put(flowStatsEntry, expiryTime);
662                             trans.putOperationalData(flowRef, flowBuilder.build());
663                             break;
664                         }
665                     }
666                 }
667             }
668             if(!foundOriginalFlow){
669                 String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
670                 this.unaccountedFlowsCounter++;
671                 FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
672                 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
673                         .augmentation(FlowCapableNode.class)
674                         .child(Table.class, new TableKey(tableId))
675                         .child(Flow.class,newFlowKey).toInstance();
676                 flowBuilder.setKey(newFlowKey);
677                 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
678                 logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",
679                         flowBuilder.build());
680
681                 // Update entry with timestamp of latest response
682                 flow.setKey(newFlowKey);
683                 FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
684                 flowStatsUpdate.put(flowStatsEntry, expiryTime);
685                 trans.putOperationalData(flowRef, flowBuilder.build());
686             }
687         }
688
689         trans.commit();
690     }
691
692     private static Long getExpiryTime(){
693         final long now = System.nanoTime();
694         return now + TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_THREAD_EXECUTION_TIME * NUMBER_OF_WAIT_CYCLES);
695     }
696
697     public synchronized void cleanStaleStatistics(){
698         final DataModificationTransaction trans = dps.beginTransaction();
699         final long now = System.nanoTime();
700
701         //Clean stale statistics related to group
702         for (Iterator<Entry<GroupDescStats, Long>> it = this.groupDescStatsUpdate.entrySet().iterator();it.hasNext();){
703             Entry<GroupDescStats, Long> e = it.next();
704             if (now > e.getValue()) {
705                 cleanGroupStatsFromDataStore(trans, e.getKey());
706                 it.remove();
707             }
708         }
709
710         //Clean stale statistics related to meter
711         for (Iterator<Entry<MeterConfigStats, Long>> it = this.meterConfigStatsUpdate.entrySet().iterator();it.hasNext();){
712             Entry<MeterConfigStats, Long> e = it.next();
713             if (now > e.getValue()) {
714                 cleanMeterStatsFromDataStore(trans, e.getKey());
715                 it.remove();
716             }
717         }
718
719         //Clean stale statistics related to flow
720         for (Iterator<Entry<FlowEntry, Long>> it = this.flowStatsUpdate.entrySet().iterator();it.hasNext();){
721             Entry<FlowEntry, Long> e = it.next();
722             if (now > e.getValue()) {
723                 cleanFlowStatsFromDataStore(trans, e.getKey());
724                 it.remove();
725             }
726         }
727
728         //Clean stale statistics related to queue
729         for (Iterator<Entry<QueueEntry, Long>> it = this.queuesStatsUpdate.entrySet().iterator();it.hasNext();){
730             Entry<QueueEntry, Long> e = it.next();
731             if (now > e.getValue()) {
732                 cleanQueueStatsFromDataStore(trans, e.getKey());
733                 it.remove();
734             }
735         }
736
737         trans.commit();
738     }
739
740     private void cleanQueueStatsFromDataStore(DataModificationTransaction trans, QueueEntry queueEntry) {
741         InstanceIdentifier<?> queueRef
742                         = InstanceIdentifier.builder(Nodes.class)
743                                             .child(Node.class, this.targetNodeKey)
744                                             .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
745                                             .augmentation(FlowCapableNodeConnector.class)
746                                             .child(Queue.class, new QueueKey(queueEntry.getQueueId()))
747                                             .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
748         trans.removeOperationalData(queueRef);
749     }
750
751     private void cleanFlowStatsFromDataStore(DataModificationTransaction trans, FlowEntry flowEntry) {
752         InstanceIdentifier<?> flowRef
753                         = InstanceIdentifier.builder(Nodes.class).child(Node.class, this.targetNodeKey)
754                                             .augmentation(FlowCapableNode.class)
755                                             .child(Table.class, new TableKey(flowEntry.getTableId()))
756                                             .child(Flow.class,flowEntry.getFlow().getKey())
757                                             .augmentation(FlowStatisticsData.class).toInstance();
758         trans.removeOperationalData(flowRef);
759     }
760
761     private void cleanMeterStatsFromDataStore(DataModificationTransaction trans, MeterConfigStats meterConfigStats) {
762         InstanceIdentifierBuilder<Meter> meterRef
763                         = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
764                                             .augmentation(FlowCapableNode.class)
765                                             .child(Meter.class,new MeterKey(meterConfigStats.getMeterId()));
766
767         InstanceIdentifier<?> nodeMeterConfigStatsAugmentation = meterRef.augmentation(NodeMeterConfigStats.class).toInstance();
768         trans.removeOperationalData(nodeMeterConfigStatsAugmentation);
769
770         InstanceIdentifier<?> nodeMeterStatisticsAugmentation = meterRef.augmentation(NodeMeterStatistics.class).toInstance();
771         trans.removeOperationalData(nodeMeterStatisticsAugmentation);
772     }
773
774     private void cleanGroupStatsFromDataStore(DataModificationTransaction trans, GroupDescStats groupDescStats) {
775         InstanceIdentifierBuilder<Group> groupRef
776                         = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
777                                             .augmentation(FlowCapableNode.class)
778                                             .child(Group.class,new GroupKey(groupDescStats.getGroupId()));
779
780         InstanceIdentifier<?> nodeGroupDescStatsAugmentation = groupRef.augmentation(NodeGroupDescStats.class).toInstance();
781         trans.removeOperationalData(nodeGroupDescStatsAugmentation);
782
783         InstanceIdentifier<?> nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance();
784         trans.removeOperationalData(nodeGroupStatisticsAugmentation);
785     }
786
787     @Override
788     public void close() {
789         // FIXME: cleanup any resources we hold (registrations, etc.)
790         logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
791     }
792 }