Move flow statistics update handling
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / NodeStatisticsHandler.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.HashMap;
11 import java.util.Iterator;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.Map.Entry;
15 import java.util.concurrent.TimeUnit;
16
17 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStatsBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.desc.GroupDescBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStatsBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatisticsBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterConfigStatsBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterFeatures;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsDataBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.flow.capable.node.connector.queue.statistics.FlowCapableNodeConnectorQueueStatisticsBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
92 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
93 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96
97 import com.google.common.base.Preconditions;
98
99 /**
100  * This class handles the lifecycle of per-node statistics. It receives data
101  * from StatisticsListener, stores it in the data store and keeps track of
102  * when the data should be removed.
103  *
104  * @author avishnoi@in.ibm.com
105  */
106 public class NodeStatisticsHandler {
107     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
108     private static final int NUMBER_OF_WAIT_CYCLES = 2;
109
110     private final Map<GroupDescStats,Long> groupDescStatsUpdate = new HashMap<>();
111     private final Map<MeterConfigStats,Long> meterConfigStatsUpdate = new HashMap<>();
112     private final Map<FlowEntry,Long> flowStatsUpdate = new HashMap<>();
113     private final Map<QueueEntry,Long> queuesStatsUpdate = new HashMap<>();
114     private final InstanceIdentifier<Node> targetNodeIdentifier;
115     private final StatisticsProvider statisticsProvider;
116     private final NodeKey targetNodeKey;
117     private int unaccountedFlowsCounter = 1;
118
119     public NodeStatisticsHandler(StatisticsProvider statisticsProvider, NodeKey nodeKey){
120         this.statisticsProvider = Preconditions.checkNotNull(statisticsProvider);
121         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
122         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
123     }
124
125     public class FlowEntry {
126         private final Short tableId;
127         private final Flow flow;
128
129         public FlowEntry(Short tableId, Flow flow){
130             this.tableId = tableId;
131             this.flow = flow;
132         }
133
134         public Short getTableId() {
135             return tableId;
136         }
137
138         public Flow getFlow() {
139             return flow;
140         }
141
142         @Override
143         public int hashCode() {
144             final int prime = 31;
145             int result = 1;
146             result = prime * result + getOuterType().hashCode();
147             result = prime * result + ((flow == null) ? 0 : flow.hashCode());
148             result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
149             return result;
150         }
151
152         @Override
153         public boolean equals(Object obj) {
154             if (this == obj)
155                 return true;
156             if (obj == null)
157                 return false;
158             if (getClass() != obj.getClass())
159                 return false;
160             FlowEntry other = (FlowEntry) obj;
161             if (!getOuterType().equals(other.getOuterType()))
162                 return false;
163             if (flow == null) {
164                 if (other.flow != null)
165                     return false;
166             } else if (!flow.equals(other.flow))
167                 return false;
168             if (tableId == null) {
169                 if (other.tableId != null)
170                     return false;
171             } else if (!tableId.equals(other.tableId))
172                 return false;
173             return true;
174         }
175
176         private NodeStatisticsHandler getOuterType() {
177             return NodeStatisticsHandler.this;
178         }
179     }
180
181     private static final class QueueEntry{
182         private final NodeConnectorId nodeConnectorId;
183         private final QueueId queueId;
184         public QueueEntry(NodeConnectorId ncId, QueueId queueId){
185             this.nodeConnectorId = ncId;
186             this.queueId = queueId;
187         }
188         public NodeConnectorId getNodeConnectorId() {
189             return nodeConnectorId;
190         }
191         public QueueId getQueueId() {
192             return queueId;
193         }
194         @Override
195         public int hashCode() {
196             final int prime = 31;
197             int result = 1;
198             result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode());
199             result = prime * result + ((queueId == null) ? 0 : queueId.hashCode());
200             return result;
201         }
202         @Override
203         public boolean equals(Object obj) {
204             if (this == obj) {
205                 return true;
206             }
207             if (obj == null) {
208                 return false;
209             }
210             if (!(obj instanceof QueueEntry)) {
211                 return false;
212             }
213             QueueEntry other = (QueueEntry) obj;
214             if (nodeConnectorId == null) {
215                 if (other.nodeConnectorId != null) {
216                     return false;
217                 }
218             } else if (!nodeConnectorId.equals(other.nodeConnectorId)) {
219                 return false;
220             }
221             if (queueId == null) {
222                 if (other.queueId != null) {
223                     return false;
224                 }
225             } else if (!queueId.equals(other.queueId)) {
226                 return false;
227             }
228             return true;
229         }
230     }
231
232     public NodeKey getTargetNodeKey() {
233         return targetNodeKey;
234     }
235
236     public synchronized void updateGroupDescStats(List<GroupDescStats> list){
237         final Long expiryTime = getExpiryTime();
238         final DataModificationTransaction trans = statisticsProvider.startChange();
239
240         for (GroupDescStats groupDescStats : list) {
241             GroupBuilder groupBuilder = new GroupBuilder();
242             GroupKey groupKey = new GroupKey(groupDescStats.getGroupId());
243             groupBuilder.setKey(groupKey);
244
245             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
246                                                                                         .augmentation(FlowCapableNode.class)
247                                                                                         .child(Group.class,groupKey).toInstance();
248
249             NodeGroupDescStatsBuilder groupDesc= new NodeGroupDescStatsBuilder();
250             GroupDescBuilder stats = new GroupDescBuilder();
251             stats.fieldsFrom(groupDescStats);
252             groupDesc.setGroupDesc(stats.build());
253
254             //Update augmented data
255             groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
256
257             trans.putOperationalData(groupRef, groupBuilder.build());
258             this.groupDescStatsUpdate.put(groupDescStats, expiryTime);
259         }
260
261         trans.commit();
262     }
263
264
265     public synchronized void updateGroupStats(List<GroupStats> list) {
266         final DataModificationTransaction trans = statisticsProvider.startChange();
267
268         for(GroupStats groupStats : list) {
269             GroupBuilder groupBuilder = new GroupBuilder();
270             GroupKey groupKey = new GroupKey(groupStats.getGroupId());
271             groupBuilder.setKey(groupKey);
272
273             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
274                                                                                         .augmentation(FlowCapableNode.class)
275                                                                                         .child(Group.class,groupKey).toInstance();
276
277             NodeGroupStatisticsBuilder groupStatisticsBuilder= new NodeGroupStatisticsBuilder();
278             GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
279             stats.fieldsFrom(groupStats);
280             groupStatisticsBuilder.setGroupStatistics(stats.build());
281
282             //Update augmented data
283             groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
284             trans.putOperationalData(groupRef, groupBuilder.build());
285
286             // FIXME: should we be tracking this data?
287         }
288
289         trans.commit();
290     }
291
292     public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
293         final Long expiryTime = getExpiryTime();
294         final DataModificationTransaction trans = statisticsProvider.startChange();
295
296         for(MeterConfigStats meterConfigStats : list) {
297             MeterBuilder meterBuilder = new MeterBuilder();
298             MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId());
299             meterBuilder.setKey(meterKey);
300
301             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
302                                                                                         .augmentation(FlowCapableNode.class)
303                                                                                         .child(Meter.class,meterKey).toInstance();
304
305             NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
306             MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
307             stats.fieldsFrom(meterConfigStats);
308             meterConfig.setMeterConfigStats(stats.build());
309
310             //Update augmented data
311             meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
312
313             trans.putOperationalData(meterRef, meterBuilder.build());
314             this.meterConfigStatsUpdate.put(meterConfigStats, expiryTime);
315         }
316
317         trans.commit();
318     }
319
320
321     public synchronized void updateMeterStats(List<MeterStats> list) {
322         final DataModificationTransaction trans = statisticsProvider.startChange();
323
324         for(MeterStats meterStats : list) {
325             MeterBuilder meterBuilder = new MeterBuilder();
326             MeterKey meterKey = new MeterKey(meterStats.getMeterId());
327             meterBuilder.setKey(meterKey);
328
329             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
330                                                                                         .augmentation(FlowCapableNode.class)
331                                                                                         .child(Meter.class,meterKey).toInstance();
332
333             NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
334             MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
335             stats.fieldsFrom(meterStats);
336             meterStatsBuilder.setMeterStatistics(stats.build());
337
338             //Update augmented data
339             meterBuilder.addAugmentation(NodeMeterStatistics.class, meterStatsBuilder.build());
340             trans.putOperationalData(meterRef, meterBuilder.build());
341
342             // FIXME: should we be tracking this data?
343         }
344
345         trans.commit();
346     }
347
348     public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
349         final Long expiryTime = getExpiryTime();
350         final DataModificationTransaction trans = statisticsProvider.startChange();
351
352         for (QueueIdAndStatisticsMap swQueueStats : list) {
353
354             QueueEntry queueEntry = new QueueEntry(swQueueStats.getNodeConnectorId(),swQueueStats.getQueueId());
355
356             FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
357
358             FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
359
360             queueStatisticsBuilder.fieldsFrom(swQueueStats);
361
362             queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
363
364             InstanceIdentifier<Queue> queueRef
365                     = InstanceIdentifier.builder(Nodes.class)
366                                         .child(Node.class, targetNodeKey)
367                                         .child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId()))
368                                         .augmentation(FlowCapableNodeConnector.class)
369                                         .child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance();
370
371             QueueBuilder queueBuilder = new QueueBuilder();
372             FlowCapableNodeConnectorQueueStatisticsData qsd = queueStatisticsDataBuilder.build();
373             queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, qsd);
374             queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
375
376             logger.debug("Augmenting queue statistics {} of queue {} to port {}",
377                                         qsd,
378                                         swQueueStats.getQueueId(),
379                                         swQueueStats.getNodeConnectorId());
380
381             trans.putOperationalData(queueRef, queueBuilder.build());
382             this.queuesStatsUpdate.put(queueEntry, expiryTime);
383         }
384
385         trans.commit();
386     }
387
388     public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
389         final DataModificationTransaction trans = statisticsProvider.startChange();
390
391         for (FlowTableAndStatisticsMap ftStats : list) {
392
393             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
394                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance();
395
396             FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
397
398             FlowTableStatisticsBuilder statisticsBuilder = new FlowTableStatisticsBuilder();
399             statisticsBuilder.setActiveFlows(ftStats.getActiveFlows());
400             statisticsBuilder.setPacketsLookedUp(ftStats.getPacketsLookedUp());
401             statisticsBuilder.setPacketsMatched(ftStats.getPacketsMatched());
402
403             final FlowTableStatistics stats = statisticsBuilder.build();
404             statisticsDataBuilder.setFlowTableStatistics(stats);
405
406             logger.debug("Augment flow table statistics: {} for table {} on Node {}",
407                     stats,ftStats.getTableId(), targetNodeKey);
408
409             TableBuilder tableBuilder = new TableBuilder();
410             tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue()));
411             tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
412             trans.putOperationalData(tableRef, tableBuilder.build());
413
414             // FIXME: should we be tracking this data?
415         }
416
417         trans.commit();
418     }
419
420     public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
421         final DataModificationTransaction trans = statisticsProvider.startChange();
422
423         for(NodeConnectorStatisticsAndPortNumberMap portStats : list) {
424
425             FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
426                                             = new FlowCapableNodeConnectorStatisticsBuilder();
427             statisticsBuilder.setBytes(portStats.getBytes());
428             statisticsBuilder.setCollisionCount(portStats.getCollisionCount());
429             statisticsBuilder.setDuration(portStats.getDuration());
430             statisticsBuilder.setPackets(portStats.getPackets());
431             statisticsBuilder.setReceiveCrcError(portStats.getReceiveCrcError());
432             statisticsBuilder.setReceiveDrops(portStats.getReceiveDrops());
433             statisticsBuilder.setReceiveErrors(portStats.getReceiveErrors());
434             statisticsBuilder.setReceiveFrameError(portStats.getReceiveFrameError());
435             statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError());
436             statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops());
437             statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors());
438
439             //Augment data to the node-connector
440             FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
441                     new FlowCapableNodeConnectorStatisticsDataBuilder();
442
443             statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
444
445             InstanceIdentifier<NodeConnector> nodeConnectorRef = InstanceIdentifier.builder(Nodes.class)
446                     .child(Node.class, targetNodeKey)
447                     .child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance();
448
449             // FIXME: can we bypass this read?
450             NodeConnector nodeConnector = (NodeConnector)trans.readOperationalData(nodeConnectorRef);
451             if(nodeConnector != null){
452                 final FlowCapableNodeConnectorStatisticsData stats = statisticsDataBuilder.build();
453                 logger.debug("Augmenting port statistics {} to port {}",stats,nodeConnectorRef.toString());
454                 NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
455                 nodeConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, stats);
456                 trans.putOperationalData(nodeConnectorRef, nodeConnectorBuilder.build());
457             }
458
459             // FIXME: should we be tracking this data?
460         }
461
462         trans.commit();
463     }
464
465     public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
466         if (tableId != null) {
467             final DataModificationTransaction trans = statisticsProvider.startChange();
468
469
470             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
471                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
472
473             AggregateFlowStatisticsDataBuilder aggregateFlowStatisticsDataBuilder = new AggregateFlowStatisticsDataBuilder();
474             AggregateFlowStatisticsBuilder aggregateFlowStatisticsBuilder = new AggregateFlowStatisticsBuilder(flowStats);
475
476             aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
477
478             logger.debug("Augment aggregate statistics: {} for table {} on Node {}",
479                     aggregateFlowStatisticsBuilder.build().toString(),tableId,targetNodeKey);
480
481             TableBuilder tableBuilder = new TableBuilder();
482             tableBuilder.setKey(new TableKey(tableId));
483             tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
484             trans.putOperationalData(tableRef, tableBuilder.build());
485
486             // FIXME: should we be tracking this data?
487             trans.commit();
488         }
489     }
490
491     public synchronized void updateGroupFeatures(GroupFeatures notification) {
492         final DataModificationTransaction trans = statisticsProvider.startChange();
493
494         final NodeBuilder nodeData = new NodeBuilder();
495         nodeData.setKey(targetNodeKey);
496
497         NodeGroupFeaturesBuilder nodeGroupFeatures = new NodeGroupFeaturesBuilder();
498         GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder(notification);
499         nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
500
501         //Update augmented data
502         nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
503         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
504
505         // FIXME: should we be tracking this data?
506         trans.commit();
507     }
508
509     public synchronized void updateMeterFeatures(MeterFeatures features) {
510         final DataModificationTransaction trans = statisticsProvider.startChange();
511
512         final NodeBuilder nodeData = new NodeBuilder();
513         nodeData.setKey(targetNodeKey);
514
515         NodeMeterFeaturesBuilder nodeMeterFeatures = new NodeMeterFeaturesBuilder();
516         MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder(features);
517         nodeMeterFeatures.setMeterFeatures(meterFeature.build());
518
519         //Update augmented data
520         nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
521         trans.putOperationalData(targetNodeIdentifier, nodeData.build());
522
523         // FIXME: should we be tracking this data?
524         trans.commit();
525     }
526
527     public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
528         final Long expiryTime = getExpiryTime();
529         final DataModificationTransaction trans = statisticsProvider.startChange();
530
531         for(FlowAndStatisticsMapList map : list) {
532             short tableId = map.getTableId();
533             boolean foundOriginalFlow = false;
534
535             FlowBuilder flowBuilder = new FlowBuilder();
536
537             FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
538
539             FlowBuilder flow = new FlowBuilder();
540             flow.setContainerName(map.getContainerName());
541             flow.setBufferId(map.getBufferId());
542             flow.setCookie(map.getCookie());
543             flow.setCookieMask(map.getCookieMask());
544             flow.setFlags(map.getFlags());
545             flow.setFlowName(map.getFlowName());
546             flow.setHardTimeout(map.getHardTimeout());
547             if(map.getFlowId() != null)
548                 flow.setId(new FlowId(map.getFlowId().getValue()));
549             flow.setIdleTimeout(map.getIdleTimeout());
550             flow.setInstallHw(map.isInstallHw());
551             flow.setInstructions(map.getInstructions());
552             if(map.getFlowId()!= null)
553                 flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
554             flow.setMatch(map.getMatch());
555             flow.setOutGroup(map.getOutGroup());
556             flow.setOutPort(map.getOutPort());
557             flow.setPriority(map.getPriority());
558             flow.setStrict(map.isStrict());
559             flow.setTableId(tableId);
560
561             Flow flowRule = flow.build();
562
563             FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
564             stats.setByteCount(map.getByteCount());
565             stats.setPacketCount(map.getPacketCount());
566             stats.setDuration(map.getDuration());
567
568             GenericStatistics flowStats = stats.build();
569
570             //Augment the data to the flow node
571
572             FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
573             flowStatistics.setByteCount(flowStats.getByteCount());
574             flowStatistics.setPacketCount(flowStats.getPacketCount());
575             flowStatistics.setDuration(flowStats.getDuration());
576             flowStatistics.setContainerName(map.getContainerName());
577             flowStatistics.setBufferId(map.getBufferId());
578             flowStatistics.setCookie(map.getCookie());
579             flowStatistics.setCookieMask(map.getCookieMask());
580             flowStatistics.setFlags(map.getFlags());
581             flowStatistics.setFlowName(map.getFlowName());
582             flowStatistics.setHardTimeout(map.getHardTimeout());
583             flowStatistics.setIdleTimeout(map.getIdleTimeout());
584             flowStatistics.setInstallHw(map.isInstallHw());
585             flowStatistics.setInstructions(map.getInstructions());
586             flowStatistics.setMatch(map.getMatch());
587             flowStatistics.setOutGroup(map.getOutGroup());
588             flowStatistics.setOutPort(map.getOutPort());
589             flowStatistics.setPriority(map.getPriority());
590             flowStatistics.setStrict(map.isStrict());
591             flowStatistics.setTableId(tableId);
592
593             flowStatisticsData.setFlowStatistics(flowStatistics.build());
594
595             logger.debug("Flow : {}",flowRule.toString());
596             logger.debug("Statistics to augment : {}",flowStatistics.build().toString());
597
598             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
599                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
600
601             Table table= (Table)trans.readConfigurationData(tableRef);
602
603             //TODO: Not a good way to do it, need to figure out better way.
604             //TODO: major issue in any alternate approach is that flow key is incrementally assigned
605             //to the flows stored in data store.
606             // Augment same statistics to all the matching masked flow
607             if(table != null){
608
609                 for(Flow existingFlow : table.getFlow()){
610                     logger.debug("Existing flow in data store : {}",existingFlow.toString());
611                     if(FlowComparator.flowEquals(flowRule,existingFlow)){
612                         InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
613                                 .augmentation(FlowCapableNode.class)
614                                 .child(Table.class, new TableKey(tableId))
615                                 .child(Flow.class,existingFlow.getKey()).toInstance();
616                         flowBuilder.setKey(existingFlow.getKey());
617                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
618                         logger.debug("Found matching flow in the datastore, augmenting statistics");
619                         foundOriginalFlow = true;
620                         // Update entry with timestamp of latest response
621                         flow.setKey(existingFlow.getKey());
622                         FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
623                         flowStatsUpdate.put(flowStatsEntry, expiryTime);
624
625                         trans.putOperationalData(flowRef, flowBuilder.build());
626                     }
627                 }
628             }
629
630             table = (Table)trans.readOperationalData(tableRef);
631             if(!foundOriginalFlow && table != null){
632
633                 for(Flow existingFlow : table.getFlow()){
634                     FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
635                     if(augmentedflowStatisticsData != null){
636                         FlowBuilder existingOperationalFlow = new FlowBuilder();
637                         existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
638                         logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
639                         if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
640                             InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
641                                     .augmentation(FlowCapableNode.class)
642                                     .child(Table.class, new TableKey(tableId))
643                                     .child(Flow.class,existingFlow.getKey()).toInstance();
644                             flowBuilder.setKey(existingFlow.getKey());
645                             flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
646                             logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
647                             foundOriginalFlow = true;
648
649                             // Update entry with timestamp of latest response
650                             flow.setKey(existingFlow.getKey());
651                             FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
652                             flowStatsUpdate.put(flowStatsEntry, expiryTime);
653                             trans.putOperationalData(flowRef, flowBuilder.build());
654                             break;
655                         }
656                     }
657                 }
658             }
659             if(!foundOriginalFlow){
660                 String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
661                 this.unaccountedFlowsCounter++;
662                 FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
663                 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
664                         .augmentation(FlowCapableNode.class)
665                         .child(Table.class, new TableKey(tableId))
666                         .child(Flow.class,newFlowKey).toInstance();
667                 flowBuilder.setKey(newFlowKey);
668                 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
669                 logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",
670                         flowBuilder.build());
671
672                 // Update entry with timestamp of latest response
673                 flow.setKey(newFlowKey);
674                 FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
675                 flowStatsUpdate.put(flowStatsEntry, expiryTime);
676                 trans.putOperationalData(flowRef, flowBuilder.build());
677             }
678         }
679
680         trans.commit();
681     }
682
683     private static Long getExpiryTime(){
684         final long now = System.nanoTime();
685         return now + TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_THREAD_EXECUTION_TIME * NUMBER_OF_WAIT_CYCLES);
686     }
687
688     public synchronized void cleanStaleStatistics(){
689         final DataModificationTransaction trans = this.statisticsProvider.startChange();
690         final long now = System.nanoTime();
691
692         //Clean stale statistics related to group
693         for (Iterator<Entry<GroupDescStats, Long>> it = this.groupDescStatsUpdate.entrySet().iterator();it.hasNext();){
694             Entry<GroupDescStats, Long> e = it.next();
695             if (now > e.getValue()) {
696                 cleanGroupStatsFromDataStore(trans, e.getKey());
697                 it.remove();
698             }
699         }
700
701         //Clean stale statistics related to meter
702         for (Iterator<Entry<MeterConfigStats, Long>> it = this.meterConfigStatsUpdate.entrySet().iterator();it.hasNext();){
703             Entry<MeterConfigStats, Long> e = it.next();
704             if (now > e.getValue()) {
705                 cleanMeterStatsFromDataStore(trans, e.getKey());
706                 it.remove();
707             }
708         }
709
710         //Clean stale statistics related to flow
711         for (Iterator<Entry<FlowEntry, Long>> it = this.flowStatsUpdate.entrySet().iterator();it.hasNext();){
712             Entry<FlowEntry, Long> e = it.next();
713             if (now > e.getValue()) {
714                 cleanFlowStatsFromDataStore(trans, e.getKey());
715                 it.remove();
716             }
717         }
718
719         //Clean stale statistics related to queue
720         for (Iterator<Entry<QueueEntry, Long>> it = this.queuesStatsUpdate.entrySet().iterator();it.hasNext();){
721             Entry<QueueEntry, Long> e = it.next();
722             if (now > e.getValue()) {
723                 cleanQueueStatsFromDataStore(trans, e.getKey());
724                 it.remove();
725             }
726         }
727
728         trans.commit();
729     }
730
731     private void cleanQueueStatsFromDataStore(DataModificationTransaction trans, QueueEntry queueEntry) {
732         InstanceIdentifier<?> queueRef
733                         = InstanceIdentifier.builder(Nodes.class)
734                                             .child(Node.class, this.targetNodeKey)
735                                             .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
736                                             .augmentation(FlowCapableNodeConnector.class)
737                                             .child(Queue.class, new QueueKey(queueEntry.getQueueId()))
738                                             .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
739         trans.removeOperationalData(queueRef);
740     }
741
742     private void cleanFlowStatsFromDataStore(DataModificationTransaction trans, FlowEntry flowEntry) {
743         InstanceIdentifier<?> flowRef
744                         = InstanceIdentifier.builder(Nodes.class).child(Node.class, this.targetNodeKey)
745                                             .augmentation(FlowCapableNode.class)
746                                             .child(Table.class, new TableKey(flowEntry.getTableId()))
747                                             .child(Flow.class,flowEntry.getFlow().getKey())
748                                             .augmentation(FlowStatisticsData.class).toInstance();
749         trans.removeOperationalData(flowRef);
750     }
751
752     private void cleanMeterStatsFromDataStore(DataModificationTransaction trans, MeterConfigStats meterConfigStats) {
753         InstanceIdentifierBuilder<Meter> meterRef
754                         = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
755                                             .augmentation(FlowCapableNode.class)
756                                             .child(Meter.class,new MeterKey(meterConfigStats.getMeterId()));
757
758         InstanceIdentifier<?> nodeMeterConfigStatsAugmentation = meterRef.augmentation(NodeMeterConfigStats.class).toInstance();
759         trans.removeOperationalData(nodeMeterConfigStatsAugmentation);
760
761         InstanceIdentifier<?> nodeMeterStatisticsAugmentation = meterRef.augmentation(NodeMeterStatistics.class).toInstance();
762         trans.removeOperationalData(nodeMeterStatisticsAugmentation);
763     }
764
765     private void cleanGroupStatsFromDataStore(DataModificationTransaction trans, GroupDescStats groupDescStats) {
766         InstanceIdentifierBuilder<Group> groupRef
767                         = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
768                                             .augmentation(FlowCapableNode.class)
769                                             .child(Group.class,new GroupKey(groupDescStats.getGroupId()));
770
771         InstanceIdentifier<?> nodeGroupDescStatsAugmentation = groupRef.augmentation(NodeGroupDescStats.class).toInstance();
772         trans.removeOperationalData(nodeGroupDescStatsAugmentation);
773
774         InstanceIdentifier<?> nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance();
775         trans.removeOperationalData(nodeGroupStatisticsAugmentation);
776     }
777 }