MD-SAL Statistics Manager - Adding support for individual flow stats
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsUpdateCommiter.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.HashMap;
11 import java.util.concurrent.ConcurrentMap;
12
13 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
14 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsUpdated;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowTableStatisticsUpdated;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.NodeConnectorStatisticsUpdated;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStatsBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.desc.GroupDescBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStatsBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatisticsBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterConfigStatsBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterStatisticsBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
69 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
70 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener,
75         OpendaylightMeterStatisticsListener, 
76         OpendaylightFlowStatisticsListener{
77     
78     public final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
79
80     private final StatisticsProvider statisticsManager;
81     
82     private final int unaccountedFlowsCounter = 1;
83
84     public StatisticsUpdateCommiter(final StatisticsProvider manager){
85
86         this.statisticsManager = manager;
87     }
88     
89     public StatisticsProvider getStatisticsManager(){
90         return statisticsManager;
91     }
92    
93     @Override
94     public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) {
95
96         //Add statistics to local cache
97         ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
98         if(!cache.containsKey(notification.getId())){
99             cache.put(notification.getId(), new NodeStatistics());
100         }
101         cache.get(notification.getId()).setMeterConfigStats(notification.getMeterConfigStats());
102         
103         //Publish data to configuration data store
104         DataModificationTransaction it = this.statisticsManager.startChange();
105         NodeKey key = new NodeKey(notification.getId());
106         NodeRef ref = getNodeRef(key);
107         
108         final NodeBuilder nodeData = new NodeBuilder(); 
109         nodeData.setKey(key);
110         
111         NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
112         MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
113         stats.setMeterConfigStats(notification.getMeterConfigStats());
114         meterConfig.setMeterConfigStats(stats.build());
115         
116         //Update augmented data
117         nodeData.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
118         
119         InstanceIdentifier<? extends Object> refValue = ref.getValue();
120         it.putOperationalData(refValue, nodeData.build());
121         it.commit();
122
123     }
124
125     @Override
126     public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
127         //Add statistics to local cache
128         ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
129         if(!cache.containsKey(notification.getId())){
130             cache.put(notification.getId(), new NodeStatistics());
131         }
132         cache.get(notification.getId()).setMeterStatistics(notification.getMeterStats());
133         
134         //Publish data to configuration data store
135         DataModificationTransaction it = this.statisticsManager.startChange();
136         NodeKey key = new NodeKey(notification.getId());
137         NodeRef ref = getNodeRef(key);
138         
139         final NodeBuilder nodeData = new NodeBuilder(); 
140         nodeData.setKey(key);
141         
142         NodeMeterStatisticsBuilder meterStats= new NodeMeterStatisticsBuilder();
143         MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
144         stats.setMeterStats(notification.getMeterStats());
145         meterStats.setMeterStatistics(stats.build());
146         
147         //Update augmented data
148         nodeData.addAugmentation(NodeMeterStatistics.class, meterStats.build());
149         
150         InstanceIdentifier<? extends Object> refValue = ref.getValue();
151         it.putOperationalData(refValue, nodeData.build());
152         it.commit();
153
154     }
155
156     @Override
157     public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
158         //Add statistics to local cache
159         ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
160         if(!cache.containsKey(notification.getId())){
161             cache.put(notification.getId(), new NodeStatistics());
162         }
163         cache.get(notification.getId()).setGroupDescStats(notification.getGroupDescStats());
164         
165         //Publish data to configuration data store
166         DataModificationTransaction it = this.statisticsManager.startChange();
167         NodeKey key = new NodeKey(notification.getId());
168         NodeRef ref = getNodeRef(key);
169         
170         final NodeBuilder nodeData = new NodeBuilder(); 
171         nodeData.setKey(key);
172         
173         NodeGroupDescStatsBuilder groupDesc= new NodeGroupDescStatsBuilder();
174         GroupDescBuilder stats = new GroupDescBuilder();
175         stats.setGroupDescStats(notification.getGroupDescStats());
176         groupDesc.setGroupDesc(stats.build());
177         
178         //Update augmented data
179         nodeData.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
180
181         InstanceIdentifier<? extends Object> refValue = ref.getValue();
182         it.putOperationalData(refValue, nodeData.build());
183         it.commit();
184
185     }
186
187     @Override
188     public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
189         
190         //Add statistics to local cache
191         ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
192         if(!cache.containsKey(notification.getId())){
193             cache.put(notification.getId(), new NodeStatistics());
194         }
195         cache.get(notification.getId()).setGroupStatistics(notification.getGroupStats());
196         
197         //Publish data to configuration data store
198         
199         DataModificationTransaction it = this.statisticsManager.startChange();
200         NodeKey key = new NodeKey(notification.getId());
201         NodeRef ref = getNodeRef(key);
202
203         final NodeBuilder nodeData = new NodeBuilder(); 
204         nodeData.setKey(key);
205         
206         NodeGroupStatisticsBuilder groupStats = new NodeGroupStatisticsBuilder();
207         GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
208         stats.setGroupStats(notification.getGroupStats());
209         groupStats.setGroupStatistics(stats.build());
210                 
211         //Update augmented data
212         nodeData.addAugmentation(NodeGroupStatistics.class, groupStats.build());
213
214         InstanceIdentifier<? extends Object> refValue = ref.getValue();
215         it.putOperationalData(refValue, nodeData.build());
216         it.commit();
217
218 //        for (GroupStats groupstat : notification.getGroupStats()) {
219 //        
220 //            GroupStatsKey groupKey = groupstat.getKey();
221 //            InstanceIdentifier<? extends Object> id = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).augmentation(NodeGroupStatistics.class).child(GroupStatistics.class).child(GroupStats.class,groupKey).toInstance();
222 //            it.putOperationalData(id, groupstat);
223 //            it.commit();
224 //        }
225     }
226     
227     @Override
228     public void onMeterFeaturesUpdated(MeterFeaturesUpdated notification) {
229
230         //Add statistics to local cache
231         ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
232         if(!cache.containsKey(notification.getId())){
233             cache.put(notification.getId(), new NodeStatistics());
234         }
235         MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder();
236         meterFeature.setMeterBandSupported(notification.getMeterBandSupported());
237         meterFeature.setMeterCapabilitiesSupported(notification.getMeterCapabilitiesSupported());
238         meterFeature.setMaxBands(notification.getMaxBands());
239         meterFeature.setMaxColor(notification.getMaxColor());
240         meterFeature.setMaxMeter(notification.getMaxMeter());
241         
242         cache.get(notification.getId()).setMeterFeatures(meterFeature.build());
243         
244         //Publish data to configuration data store
245         DataModificationTransaction it = this.statisticsManager.startChange();
246         NodeKey key = new NodeKey(notification.getId());
247         NodeRef ref = getNodeRef(key);
248         
249         final NodeBuilder nodeData = new NodeBuilder(); 
250         nodeData.setKey(key);
251         
252         NodeMeterFeaturesBuilder nodeMeterFeatures= new NodeMeterFeaturesBuilder();
253         nodeMeterFeatures.setMeterFeatures(meterFeature.build());
254         
255         //Update augmented data
256         nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
257         
258         InstanceIdentifier<? extends Object> refValue = ref.getValue();
259         it.putOperationalData(refValue, nodeData.build());
260         it.commit();
261     }
262     
263     @Override
264     public void onGroupFeaturesUpdated(GroupFeaturesUpdated notification) {
265
266         //Add statistics to local cache
267         ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
268         if(!cache.containsKey(notification.getId())){
269             cache.put(notification.getId(), new NodeStatistics());
270         }
271         
272         GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder();
273         groupFeatures.setActions(notification.getActions());
274         groupFeatures.setGroupCapabilitiesSupported(notification.getGroupCapabilitiesSupported());
275         groupFeatures.setGroupTypesSupported(notification.getGroupTypesSupported());
276         groupFeatures.setMaxGroups(notification.getMaxGroups());
277         cache.get(notification.getId()).setGroupFeatures(groupFeatures.build());
278         
279         //Publish data to configuration data store
280         DataModificationTransaction it = this.statisticsManager.startChange();
281         NodeKey key = new NodeKey(notification.getId());
282         NodeRef ref = getNodeRef(key);
283         
284         final NodeBuilder nodeData = new NodeBuilder(); 
285         nodeData.setKey(key);
286         
287         NodeGroupFeaturesBuilder nodeGroupFeatures= new NodeGroupFeaturesBuilder();
288         nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
289         
290         //Update augmented data
291         nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
292         
293         InstanceIdentifier<? extends Object> refValue = ref.getValue();
294         it.putOperationalData(refValue, nodeData.build());
295         it.commit();
296     }
297
298     @Override
299     public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
300         NodeKey key = new NodeKey(notification.getId());
301         sucLogger.info("Received flow stats update : {}",notification.toString());
302         
303         for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){
304             short tableId = map.getTableId();
305             
306             DataModificationTransaction it = this.statisticsManager.startChange();
307
308             boolean foundOriginalFlow = false;
309
310             FlowBuilder flowBuilder = new FlowBuilder();
311
312             FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
313
314             FlowBuilder flow = new FlowBuilder();
315             flow.setContainerName(map.getContainerName());
316             flow.setBufferId(map.getBufferId());
317             flow.setCookie(map.getCookie());
318             flow.setCookieMask(map.getCookieMask());
319             flow.setFlags(map.getFlags());
320             flow.setFlowName(map.getFlowName());
321             flow.setHardTimeout(map.getHardTimeout());
322             if(map.getFlowId() != null)
323                 flow.setId(new FlowId(map.getFlowId().getValue()));
324             flow.setIdleTimeout(map.getIdleTimeout());
325             flow.setInstallHw(map.isInstallHw());
326             flow.setInstructions(map.getInstructions());
327             if(map.getFlowId()!= null)
328                 flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
329             flow.setMatch(map.getMatch());
330             flow.setOutGroup(map.getOutGroup());
331             flow.setOutPort(map.getOutPort());
332             flow.setPriority(map.getPriority());
333             flow.setStrict(map.isStrict());
334             flow.setTableId(tableId);
335                 
336             Flow flowRule = flow.build();
337                 
338             FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
339             stats.setByteCount(map.getByteCount());
340             stats.setPacketCount(map.getPacketCount());
341             stats.setDuration(map.getDuration());
342                 
343             GenericStatistics flowStats = stats.build();
344                 
345             //Add statistics to local cache
346             ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
347             if(!cache.containsKey(notification.getId())){
348                 cache.put(notification.getId(), new NodeStatistics());
349             }
350             if(!cache.get(notification.getId()).getFlowAndStatsMap().containsKey(tableId)){
351                 cache.get(notification.getId()).getFlowAndStatsMap().put(tableId, new HashMap<Flow,GenericStatistics>());
352             }
353             cache.get(notification.getId()).getFlowAndStatsMap().get(tableId).put(flowRule,flowStats);
354                 
355             //Augment the data to the flow node
356
357             FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
358             flowStatistics.setByteCount(flowStats.getByteCount());
359             flowStatistics.setPacketCount(flowStats.getPacketCount());
360             flowStatistics.setDuration(flowStats.getDuration());
361             flowStatistics.setContainerName(map.getContainerName());
362             flowStatistics.setBufferId(map.getBufferId());
363             flowStatistics.setCookie(map.getCookie());
364             flowStatistics.setCookieMask(map.getCookieMask());
365             flowStatistics.setFlags(map.getFlags());
366             flowStatistics.setFlowName(map.getFlowName());
367             flowStatistics.setHardTimeout(map.getHardTimeout());
368             flowStatistics.setIdleTimeout(map.getIdleTimeout());
369             flowStatistics.setInstallHw(map.isInstallHw());
370             flowStatistics.setInstructions(map.getInstructions());
371             flowStatistics.setMatch(map.getMatch());
372             flowStatistics.setOutGroup(map.getOutGroup());
373             flowStatistics.setOutPort(map.getOutPort());
374             flowStatistics.setPriority(map.getPriority());
375             flowStatistics.setStrict(map.isStrict());
376             flowStatistics.setTableId(tableId);
377
378             flowStatisticsData.setFlowStatistics(flowStatistics.build());
379                 
380             sucLogger.info("Flow : {}",flowRule.toString());
381             sucLogger.info("Statistics to augment : {}",flowStatistics.build().toString());
382
383             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
384                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
385             
386             Table table= (Table)it.readConfigurationData(tableRef);
387
388             //TODO: Not a good way to do it, need to figure out better way.
389             //TODO: major issue in any alternate approach is that flow key is incrementally assigned 
390             //to the flows stored in data store.
391             if(table != null){
392
393                 for(Flow existingFlow : table.getFlow()){
394                     sucLogger.debug("Existing flow in data store : {}",existingFlow.toString());
395                     if(flowEquals(flowRule,existingFlow)){
396                         InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
397                                 .augmentation(FlowCapableNode.class)
398                                 .child(Table.class, new TableKey(tableId))
399                                 .child(Flow.class,existingFlow.getKey()).toInstance();
400                         flowBuilder.setKey(existingFlow.getKey());
401                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
402                         sucLogger.debug("Found matching flow in the datastore, augmenting statistics");
403                         foundOriginalFlow = true;
404                         it.putOperationalData(flowRef, flowBuilder.build());
405                         it.commit();
406                         break;
407                     }
408                 }
409             }
410             
411             if(!foundOriginalFlow){
412                 sucLogger.info("Associated original flow is not found in data store. Augmenting flow in operational data st");
413                 //TODO: Temporary fix: format [ 0+tableid+0+unaccounted flow counter]
414                 long flowKey = Long.getLong(new String("0"+Short.toString(tableId)+"0"+Integer.toString(this.unaccountedFlowsCounter)));
415                 FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
416                 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
417                         .augmentation(FlowCapableNode.class)
418                         .child(Table.class, new TableKey(tableId))
419                         .child(Flow.class,newFlowKey).toInstance();
420                 flowBuilder.setKey(newFlowKey);
421                 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
422                 sucLogger.debug("Flow was no present in data store, augmenting statistics as an unaccounted flow");
423                 it.putOperationalData(flowRef, flowBuilder.build());
424                 it.commit();
425             }
426         }
427     }
428
429     @Override
430     public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
431         NodeKey key = new NodeKey(notification.getId());
432         sucLogger.info("Received aggregate flow statistics update : {}",notification.toString());
433         
434         Short tableId = this.statisticsManager.getMultipartMessageManager().getTableIdForTxId(notification.getTransactionId());
435         if(tableId != null){
436             
437             DataModificationTransaction it = this.statisticsManager.startChange();
438
439             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
440                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
441
442             AggregateFlowStatisticsDataBuilder aggregateFlowStatisticsDataBuilder = new AggregateFlowStatisticsDataBuilder();
443             AggregateFlowStatisticsBuilder aggregateFlowStatisticsBuilder = new AggregateFlowStatisticsBuilder();
444             aggregateFlowStatisticsBuilder.setByteCount(notification.getByteCount());
445             aggregateFlowStatisticsBuilder.setFlowCount(notification.getFlowCount());
446             aggregateFlowStatisticsBuilder.setPacketCount(notification.getPacketCount());
447             aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
448             
449             ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
450             if(!cache.containsKey(notification.getId())){
451                 cache.put(notification.getId(), new NodeStatistics());
452             }
453             cache.get(notification.getId()).getTableAndAggregateFlowStatsMap().put(tableId,aggregateFlowStatisticsBuilder.build());
454             
455             sucLogger.info("Augment aggregate statistics: {} for table {} on Node {}",aggregateFlowStatisticsBuilder.build().toString(),tableId,key);
456
457             TableBuilder tableBuilder = new TableBuilder();
458             tableBuilder.setKey(new TableKey(tableId));
459             tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
460             it.putOperationalData(tableRef, tableBuilder.build());
461             it.commit();
462
463         }
464     }
465
466     @Override
467     public void onFlowStatisticsUpdated(FlowStatisticsUpdated notification) {
468         // TODO Auto-generated method stub
469         //TODO: Depricated, will clean it up once sal-compatibility is fixed.
470         //Sal-Compatibility code usage this notification event.
471         
472     }
473
474     @Override
475     public void onFlowTableStatisticsUpdated(FlowTableStatisticsUpdated notification) {
476         // TODO Auto-generated method stub
477         //TODO: Need to implement it yet
478         
479     }
480
481     @Override
482     public void onNodeConnectorStatisticsUpdated(NodeConnectorStatisticsUpdated notification) {
483         // TODO Auto-generated method stub
484         //TODO: Need to implement it yet
485         
486     }
487
488
489
490     private NodeRef getNodeRef(NodeKey nodeKey){
491         InstanceIdentifierBuilder<?> builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey);
492         return new NodeRef(builder.toInstance());
493     }
494     
495     public boolean flowEquals(Flow statsFlow, Flow storedFlow) {
496         if (statsFlow.getClass() != storedFlow.getClass()) {
497             return false;
498         }
499         if (statsFlow.getBufferId()== null) {
500             if (storedFlow.getBufferId() != null) {
501                 return false;
502             }
503         } else if(!statsFlow.getBufferId().equals(storedFlow.getBufferId())) {
504             return false;
505         }
506         if (statsFlow.getContainerName()== null) {
507             if (storedFlow.getContainerName()!= null) {
508                 return false;
509             }
510         } else if(!statsFlow.getContainerName().equals(storedFlow.getContainerName())) {
511             return false;
512         }
513         if (statsFlow.getCookie()== null) {
514             if (storedFlow.getCookie()!= null) {
515                 return false;
516             }
517         } else if(!statsFlow.getCookie().equals(storedFlow.getCookie())) {
518             return false;
519         }
520         if (statsFlow.getMatch()== null) {
521             if (storedFlow.getMatch() != null) {
522                 return false;
523             }
524         } else if(!statsFlow.getMatch().equals(storedFlow.getMatch())) {
525             return false;
526         }
527         if (statsFlow.getCookie()== null) {
528             if (storedFlow.getCookie()!= null) {
529                 return false;
530             }
531         } else if(!statsFlow.getCookie().equals(storedFlow.getCookie())) {
532             return false;
533         }
534         if (statsFlow.getHardTimeout() == null) {
535             if (storedFlow.getHardTimeout() != null) {
536                 return false;
537             }
538         } else if(!statsFlow.getHardTimeout().equals(storedFlow.getHardTimeout() )) {
539             return false;
540         }
541         if (statsFlow.getIdleTimeout()== null) {
542             if (storedFlow.getIdleTimeout() != null) {
543                 return false;
544             }
545         } else if(!statsFlow.getIdleTimeout().equals(storedFlow.getIdleTimeout())) {
546             return false;
547         }
548         if (statsFlow.getPriority() == null) {
549             if (storedFlow.getPriority() != null) {
550                 return false;
551             }
552         } else if(!statsFlow.getPriority().equals(storedFlow.getPriority())) {
553             return false;
554         }
555         if (statsFlow.getTableId() == null) {
556             if (storedFlow.getTableId() != null) {
557                 return false;
558             }
559         } else if(!statsFlow.getTableId().equals(storedFlow.getTableId())) {
560             return false;
561         }
562         return true;
563     }
564
565 }