MD-SAL Statistics Manager - Adding support for individual flow stats 08/3708/2
authorAnil Vishnoi <avishnoi@in.ibm.com>
Tue, 10 Dec 2013 18:36:31 +0000 (00:06 +0530)
committerAnil Vishnoi <avishnoi@in.ibm.com>
Sat, 14 Dec 2013 02:20:06 +0000 (07:50 +0530)
and aggregate flow statistics.

Change-Id: I0aaa4a70dd8083ed7d6b39d06fe1b040510efbc2
Signed-off-by: Anil Vishnoi <avishnoi@in.ibm.com>
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/adsal/FlowStatisticsAdapter.java
opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-flow-types.yang
opendaylight/md-sal/model/model-flow-statistics/src/main/yang/flow-statistics.yang
opendaylight/md-sal/model/model-flow-statistics/src/main/yang/statistics-types.yang
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatistics.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java

index f5c5620d7cccf3e023211d64629b1f77a55470c7..09585d6273a0c2cecd32e90d4f442313bf685946 100644 (file)
@@ -21,12 +21,22 @@ import org.opendaylight.controller.sal.reader.NodeDescription;
 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.Counter64;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsUpdatedBuilder;
 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.Counter64;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllNodeConnectorStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllNodeConnectorStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllNodeConnectorStatisticsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllNodeConnectorStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllNodeConnectorStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllNodeConnectorStatisticsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsOutputBuilder;
@@ -292,4 +302,39 @@ public class FlowStatisticsAdapter implements OpendaylightFlowStatisticsService,
         return builder.build();
     }
 
         return builder.build();
     }
 
+    @Override
+    public Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> getAggregateFlowStatisticsFromFlowTableForAllFlows(
+            GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput>> getAggregateFlowStatisticsFromFlowTableForGivenMatch(
+            GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> getAllFlowStatisticsFromFlowTable(
+            GetAllFlowStatisticsFromFlowTableInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> getAllFlowsStatisticsFromAllFlowTables(
+            GetAllFlowsStatisticsFromAllFlowTablesInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> getFlowStatisticsFromFlowTable(
+            GetFlowStatisticsFromFlowTableInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }
 }
index e83306db50db1155ab1d9fb7147fd71e2329cfff..6e87cc6f1c5642530c7175bcc7774da93824317e 100644 (file)
@@ -12,6 +12,9 @@ module opendaylight-flow-types {
         description "Initial revision of flow service";
     }
     
         description "Initial revision of flow service";
     }
     
+    typedef table-id {
+       type uint8;
+    }
     grouping instruction-list {
         list instruction {
             key "order";
     grouping instruction-list {
         list instruction {
             key "order";
index 87d1559e4921ffed91ad38c8a431ec78523fd4d2..80e02806e819c37896d83dbd40e50007681312f0 100644 (file)
@@ -3,28 +3,155 @@ module opendaylight-flow-statistics {
     prefix flowstat;
 
     import yang-ext {prefix ext; revision-date "2013-07-09";}
     prefix flowstat;
 
     import yang-ext {prefix ext; revision-date "2013-07-09";}
+    import ietf-yang-types {prefix yang; revision-date "2010-09-24";}   
     import opendaylight-inventory {prefix inv;revision-date "2013-08-19";}
     import opendaylight-flow-types {prefix flow-types;revision-date "2013-10-26";}
     import opendaylight-inventory {prefix inv;revision-date "2013-08-19";}
     import opendaylight-flow-types {prefix flow-types;revision-date "2013-10-26";}
-    import sal-flow {prefix flow;}
     import opendaylight-statistics-types {prefix stat-types;revision-date "2013-09-25";}
     import opendaylight-statistics-types {prefix stat-types;revision-date "2013-09-25";}
+    import flow-node-inventory {prefix flow-node;revision-date "2013-08-19";}
+    import flow-capable-transaction {prefix tr;}
+    import sal-flow {prefix flow;}
+    
 
     revision "2013-08-19" {
 
     revision "2013-08-19" {
-        description "Initial revision of flow service";
+        description "Initial revision of flow statistics service";
+    }
+       
+       //Augment flow statistics data to the flow-capable-node->table->flow
+       augment "/inv:nodes/inv:node/flow-node:table/flow-node:flow" {
+        ext:augment-identifier "flow-statistics-data";
+        uses flow-statistics;
+    }
+       
+       grouping flow-statistics {
+        container flow-statistics {
+            //config "false";
+               uses flow-types:flow;
+               uses stat-types:generic-statistics;
+        }
+       }    
+       
+       typedef flow-id {
+               description "flow id";
+               type yang:counter32;
+       }
+       
+       grouping flow-and-statistics-map-list {
+               description "List of flow and statistics map";
+               list flow-and-statistics-map-list {
+                       key "flow-id";
+                       leaf flow-id {
+                               type flow-id;
+                       }
+                       uses flow-and-statistics-map;
+               }
+       }
+       
+       grouping flow-and-statistics-map{
+               description "Mapping between flow and its statistics";
+               uses flow-types:flow;
+               uses stat-types:generic-statistics;
+       }
+       
+    // RPC calls to fetch flow statistics
+    rpc get-all-flows-statistics-from-all-flow-tables {
+       description "Fetch statistics of all the flow present in all the flow tables of the switch"; 
+        input {
+            uses inv:node-context-ref;
+        }
+        output {
+            uses flow-and-statistics-map-list;
+            uses tr:transaction-aware;
+        }
+    
     }
 
     }
 
-    rpc get-node-connector-statistics {
+    rpc get-all-flow-statistics-from-flow-table {
+       description "Fetch statistics of all the flow present in the specific flow table of the switch"; 
         input {
             uses inv:node-context-ref;
         input {
             uses inv:node-context-ref;
-            leaf node-connector {
-                type inv:node-connector-ref;
+            leaf table-id {
+               type flow-types:table-id;
             }
         }
         output {
             }
         }
         output {
-            uses stat-types:node-connector-statistics;
+            uses flow-and-statistics-map-list;
+            uses tr:transaction-aware;
         }
     }
 
         }
     }
 
-    rpc get-flow-statistics {
+    rpc get-flow-statistics-from-flow-table {
+       description "Fetch statistics of the specific flow present in the specific flow table of the switch"; 
+        input {
+            uses inv:node-context-ref;
+            uses flow-types:flow;
+        }
+        output {
+            uses flow-and-statistics-map-list;
+            uses tr:transaction-aware;
+        }
+    }
+
+    notification flows-statistics-update {
+       description "Flows statistics sent by switch";
+               leaf moreReplies {
+            type boolean;
+        }
+        uses inv:node;
+               uses flow-and-statistics-map-list;
+               uses tr:transaction-aware;
+    }
+
+       //Models for aggregate flow statistics collection
+       augment "/inv:nodes/inv:node/flow-node:table" {
+        ext:augment-identifier "aggregate-flow-statistics-data";
+        uses aggregate-flow-statistics;
+    }
+       
+       grouping aggregate-flow-statistics {
+        container aggregate-flow-statistics {
+            //config "false";
+               uses stat-types:aggregate-flow-statistics;
+        }
+       }    
+       
+    // RPC calls to fetch flow statistics
+    rpc get-aggregate-flow-statistics-from-flow-table-for-all-flows {
+       description "Fetch aggregate statistics for all the flows present in the specific flow table of the switch"; 
+        input {
+            uses inv:node-context-ref;
+            leaf table-id {
+               type flow-types:table-id;
+            }
+        }
+        output {
+               uses stat-types:aggregate-flow-statistics;
+            uses tr:transaction-aware;
+        }
+    }
+    rpc get-aggregate-flow-statistics-from-flow-table-for-given-match {
+       description "Fetch aggregate statistics for all the flow matches to the given match from the given table of the switch"; 
+        input {
+            uses inv:node-context-ref;
+            uses flow-types:flow;
+        }
+        output {
+               uses stat-types:aggregate-flow-statistics;
+            uses tr:transaction-aware;
+        }
+    }
+
+    notification aggregate-flow-statistics-update {
+       description "Aggregate flow statistics for a table, sent by switch";
+               leaf moreReplies {
+            type boolean;
+        }
+        uses inv:node;
+               uses stat-types:aggregate-flow-statistics;
+               uses tr:transaction-aware;
+    }
+       
+       //Keeping flow statistics RPC call for backward compatibility for sal-compatibility layer --START
+       rpc get-flow-statistics {
         input {
             uses inv:node-context-ref;
             uses flow-types:flow;
         input {
             uses inv:node-context-ref;
             uses flow-types:flow;
@@ -45,6 +172,25 @@ module opendaylight-flow-statistics {
         }
     }
 
         }
     }
 
+    notification flow-statistics-updated {
+        uses flow-types:flow-statistics;
+    }
+       
+       //Keeping flow statistics RPC call for backward compatibility for sal-compatibility layer --END
+       
+       //RPC call to fetch node connector statistics
+    rpc get-node-connector-statistics {
+        input {
+            uses inv:node-context-ref;
+            leaf node-connector {
+                type inv:node-connector-ref;
+            }
+        }
+        output {
+            uses stat-types:node-connector-statistics;
+        }
+    }
+
     rpc get-all-node-connector-statistics {
         input {
             uses inv:node-context-ref;
     rpc get-all-node-connector-statistics {
         input {
             uses inv:node-context-ref;
@@ -56,10 +202,6 @@ module opendaylight-flow-statistics {
         }
     }
 
         }
     }
 
-    notification flow-statistics-updated {
-        uses flow-types:flow-statistics;
-    }
-
     rpc get-flow-table-statistics {
         input {
             uses inv:node-context-ref;
     rpc get-flow-table-statistics {
         input {
             uses inv:node-context-ref;
@@ -79,6 +221,4 @@ module opendaylight-flow-statistics {
     notification node-connector-statistics-updated {
         uses stat-types:node-connector-statistics;
     }
     notification node-connector-statistics-updated {
         uses stat-types:node-connector-statistics;
     }
-
-
 }
 }
index d0b2e6a95931f8abdebf7f0f0c7353791a49b481..ff35c595e26bd56b5fa1adde4833913436ec628f 100644 (file)
@@ -60,4 +60,39 @@ module opendaylight-statistics-types {
             }
         }
     }
             }
         }
     }
+    
+    grouping generic-statistics {
+       description "Generic grouping for statistics";
+        leaf packet-count {
+            type yang:counter64;
+        }
+
+        leaf byte-count {
+            type yang:counter64;
+        }
+
+        container duration {
+            leaf second {
+                type yang:counter64;
+            }
+            leaf nanosecond {
+                type yang:counter64;
+            }
+        }
+    }
+    
+    grouping aggregate-flow-statistics {
+       description "Aggregate flow statistics";
+        leaf packet-count {
+            type yang:counter64;
+        }
+
+        leaf byte-count {
+            type yang:counter64;
+        }
+        leaf flow-count {
+            type yang:counter32;
+        }
+    }
+    
 }
\ No newline at end of file
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java
new file mode 100644 (file)
index 0000000..11cce72
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+
+/**
+ * Main responsibility of the class is to manage multipart response 
+ * for multipart request. It also handles the flow aggregate request
+ * and response mapping. 
+ * @author avishnoi@in.ibm.com
+ *
+ */
+public class MultipartMessageManager {
+
+    private static Map<TransactionId,Short> txIdTotableIdMap = new ConcurrentHashMap<TransactionId,Short>();
+    
+    public MultipartMessageManager(){}
+    
+    public Short getTableIdForTxId(TransactionId id){
+        
+        return txIdTotableIdMap.get(id);
+        
+    }
+    
+    public void setTxIdAndTableIdMapEntry(TransactionId id,Short tableId){
+        txIdTotableIdMap.put(id, tableId);
+    }
+}
index c48ac311abb33dc0ffdfe5ee1ca4c3f26595111b..9c457f6df287ad7cb13d8866412e2ae1e9714964 100644 (file)
@@ -7,8 +7,12 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
  */
 package org.opendaylight.controller.md.statistics.manager;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 
 
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
@@ -16,6 +20,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
 
 public class NodeStatistics {
 
 
 public class NodeStatistics {
 
@@ -33,10 +38,14 @@ public class NodeStatistics {
     
     private MeterFeatures meterFeatures;
     
     
     private MeterFeatures meterFeatures;
     
-    public NodeStatistics(){
-        
-    }
-
+    private final Map<Short,Map<Flow,GenericStatistics>> flowAndStatsMap= 
+            new HashMap<Short,Map<Flow,GenericStatistics>>();
+    
+    private final Map<Short,AggregateFlowStatistics> tableAndAggregateFlowStatsMap = 
+            new HashMap<Short,AggregateFlowStatistics>();
+    
+    public NodeStatistics(){}     
+    
     public NodeRef getTargetNode() {
         return targetNode;
     }
     public NodeRef getTargetNode() {
         return targetNode;
     }
@@ -92,5 +101,13 @@ public class NodeStatistics {
     public void setMeterFeatures(MeterFeatures meterFeatures) {
         this.meterFeatures = meterFeatures;
     }
     public void setMeterFeatures(MeterFeatures meterFeatures) {
         this.meterFeatures = meterFeatures;
     }
+
+    public Map<Short,Map<Flow,GenericStatistics>> getFlowAndStatsMap() {
+        return flowAndStatsMap;
+    }
+
+    public Map<Short, AggregateFlowStatistics> getTableAndAggregateFlowStatsMap() {
+        return tableAndAggregateFlowStatsMap;
+    }
     
 }
     
 }
index f22ca00b2ab4d0dc20d551aa08d58522848923a7..061e633e089b100d1fe5c4a9d5d8c6f296f426cd 100644 (file)
@@ -7,9 +7,11 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
  */
 package org.opendaylight.controller.md.statistics.manager;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.eclipse.xtext.xbase.lib.Exceptions;
 import java.util.concurrent.Future;
 
 import org.eclipse.xtext.xbase.lib.Exceptions;
@@ -17,6 +19,13 @@ import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.TableId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
@@ -26,6 +35,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
@@ -50,6 +60,10 @@ public class StatisticsProvider implements AutoCloseable {
     
     private OpendaylightMeterStatisticsService meterStatsService;
     
     
     private OpendaylightMeterStatisticsService meterStatsService;
     
+    private OpendaylightFlowStatisticsService flowStatsService;
+    
+    private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
+    
     private Thread statisticsRequesterThread;
     
     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
     private Thread statisticsRequesterThread;
     
     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
@@ -76,6 +90,10 @@ public class StatisticsProvider implements AutoCloseable {
       this.nps = notificationService;
     }
 
       this.nps = notificationService;
     }
 
+    public MultipartMessageManager getMultipartMessageManager() {
+        return multipartMessageManager;
+    }
+
     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
     
     private Registration<NotificationListener> listenerRegistration;
     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
     
     private Registration<NotificationListener> listenerRegistration;
@@ -92,6 +110,9 @@ public class StatisticsProvider implements AutoCloseable {
         
         meterStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightMeterStatisticsService.class);
         
         meterStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightMeterStatisticsService.class);
+        
+        flowStatsService = StatisticsManagerActivator.getProviderContext().
+                getRpcService(OpendaylightFlowStatisticsService.class);
 
         statisticsRequesterThread = new Thread( new Runnable(){
 
 
         statisticsRequesterThread = new Thread( new Runnable(){
 
@@ -131,14 +152,24 @@ public class StatisticsProvider implements AutoCloseable {
             return;
 
         for (Node targetNode : targetNodes){
             return;
 
         for (Node targetNode : targetNodes){
+            
+            InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
+            NodeRef targetNodeRef = new NodeRef(targetInstanceId);
+            
+            try {
+                
+                sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
+
+                sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
+
+            }catch(Exception e){
+                spLogger.error("Exception occured while sending flow statistics request : {}",e);
+            }
 
             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
 
                 spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
                 
 
             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
 
                 spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
                 
-                InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-                NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-                
                 try{
                   sendAllGroupStatisticsRequest(targetNodeRef);
                   Thread.sleep(1000);
                 try{
                   sendAllGroupStatisticsRequest(targetNodeRef);
                   Thread.sleep(1000);
@@ -155,11 +186,57 @@ public class StatisticsProvider implements AutoCloseable {
         }
     }
     
         }
     }
     
+    private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode){
+        final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
+                new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
+        
+        input.setNode(targetNode);
+        
+        @SuppressWarnings("unused")
+        Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = 
+                flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
+        
+    }
+    
+    private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
+        
+        List<Short> tablesId = getTablesFromNode(targetNodeKey);
+        
+        if(tablesId.size() != 0){
+            for(Short id : tablesId){
+                
+                spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
+                GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
+                        new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+                
+                input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
+                input.setTableId(new TableId(id));
+                Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
+                        flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
+                
+                multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
+            }
+        }
+        
+        //Note: Just for testing, because i am not able to fetch table list from datastore
+        // Bug-225 is raised for investigation.
+        
+//                spLogger.info("Send aggregate stats request for flow table {} to node {}",1,targetNodeKey);
+//                GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
+//                        new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+//                
+//                input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
+//                input.setTableId(new TableId((short)1));
+//                Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
+//                        flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());`
+//                
+//                multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), (short)1);
+    }
+
     private void sendAllGroupStatisticsRequest(NodeRef targetNode){
         
         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
         
     private void sendAllGroupStatisticsRequest(NodeRef targetNode){
         
         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
         
-        input.setNode(targetNode);
         input.setNode(targetNode);
 
         @SuppressWarnings("unused")
         input.setNode(targetNode);
 
         @SuppressWarnings("unused")
@@ -213,6 +290,20 @@ public class StatisticsProvider implements AutoCloseable {
         spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
         return nodes.getNode();
     }
         spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
         return nodes.getNode();
     }
+    
+    private List<Short> getTablesFromNode(NodeKey nodeKey){
+        InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
+        
+        FlowCapableNode node = (FlowCapableNode)dps.readConfigurationData(nodesIdentifier);
+        List<Short> tablesId = new ArrayList<Short>();
+        if(node != null && node.getTable()!=null){
+            spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
+            for(Table table: node.getTable()){
+                tablesId.add(table.getId());
+            }
+        }
+        return tablesId;
+    }
 
     @SuppressWarnings("deprecation")
     @Override
 
     @SuppressWarnings("deprecation")
     @Override
index 86e6114b5f67a9df25f02e8ae98677d0d842cc54..1686685cdaad23e6079087a02f2e8366f3173942 100644 (file)
@@ -7,9 +7,32 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
  */
 package org.opendaylight.controller.md.statistics.manager;
 
+import java.util.HashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import java.util.concurrent.ConcurrentMap;
 
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowTableStatisticsUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.NodeConnectorStatisticsUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
@@ -42,13 +65,21 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterConfigStatsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterConfigStatsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener,
 
 public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener,
-        OpendaylightMeterStatisticsListener {
+        OpendaylightMeterStatisticsListener, 
+        OpendaylightFlowStatisticsListener{
     
     
+    public final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
+
     private final StatisticsProvider statisticsManager;
     private final StatisticsProvider statisticsManager;
+    
+    private final int unaccountedFlowsCounter = 1;
 
     public StatisticsUpdateCommiter(final StatisticsProvider manager){
 
 
     public StatisticsUpdateCommiter(final StatisticsProvider manager){
 
@@ -264,9 +295,271 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
         it.commit();
     }
 
         it.commit();
     }
 
+    @Override
+    public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
+        NodeKey key = new NodeKey(notification.getId());
+        sucLogger.info("Received flow stats update : {}",notification.toString());
+        
+        for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){
+            short tableId = map.getTableId();
+            
+            DataModificationTransaction it = this.statisticsManager.startChange();
+
+            boolean foundOriginalFlow = false;
+
+            FlowBuilder flowBuilder = new FlowBuilder();
+
+            FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
+
+            FlowBuilder flow = new FlowBuilder();
+            flow.setContainerName(map.getContainerName());
+            flow.setBufferId(map.getBufferId());
+            flow.setCookie(map.getCookie());
+            flow.setCookieMask(map.getCookieMask());
+            flow.setFlags(map.getFlags());
+            flow.setFlowName(map.getFlowName());
+            flow.setHardTimeout(map.getHardTimeout());
+            if(map.getFlowId() != null)
+                flow.setId(new FlowId(map.getFlowId().getValue()));
+            flow.setIdleTimeout(map.getIdleTimeout());
+            flow.setInstallHw(map.isInstallHw());
+            flow.setInstructions(map.getInstructions());
+            if(map.getFlowId()!= null)
+                flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
+            flow.setMatch(map.getMatch());
+            flow.setOutGroup(map.getOutGroup());
+            flow.setOutPort(map.getOutPort());
+            flow.setPriority(map.getPriority());
+            flow.setStrict(map.isStrict());
+            flow.setTableId(tableId);
+                
+            Flow flowRule = flow.build();
+                
+            FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
+            stats.setByteCount(map.getByteCount());
+            stats.setPacketCount(map.getPacketCount());
+            stats.setDuration(map.getDuration());
+                
+            GenericStatistics flowStats = stats.build();
+                
+            //Add statistics to local cache
+            ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+            if(!cache.containsKey(notification.getId())){
+                cache.put(notification.getId(), new NodeStatistics());
+            }
+            if(!cache.get(notification.getId()).getFlowAndStatsMap().containsKey(tableId)){
+                cache.get(notification.getId()).getFlowAndStatsMap().put(tableId, new HashMap<Flow,GenericStatistics>());
+            }
+            cache.get(notification.getId()).getFlowAndStatsMap().get(tableId).put(flowRule,flowStats);
+                
+            //Augment the data to the flow node
+
+            FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
+            flowStatistics.setByteCount(flowStats.getByteCount());
+            flowStatistics.setPacketCount(flowStats.getPacketCount());
+            flowStatistics.setDuration(flowStats.getDuration());
+            flowStatistics.setContainerName(map.getContainerName());
+            flowStatistics.setBufferId(map.getBufferId());
+            flowStatistics.setCookie(map.getCookie());
+            flowStatistics.setCookieMask(map.getCookieMask());
+            flowStatistics.setFlags(map.getFlags());
+            flowStatistics.setFlowName(map.getFlowName());
+            flowStatistics.setHardTimeout(map.getHardTimeout());
+            flowStatistics.setIdleTimeout(map.getIdleTimeout());
+            flowStatistics.setInstallHw(map.isInstallHw());
+            flowStatistics.setInstructions(map.getInstructions());
+            flowStatistics.setMatch(map.getMatch());
+            flowStatistics.setOutGroup(map.getOutGroup());
+            flowStatistics.setOutPort(map.getOutPort());
+            flowStatistics.setPriority(map.getPriority());
+            flowStatistics.setStrict(map.isStrict());
+            flowStatistics.setTableId(tableId);
+
+            flowStatisticsData.setFlowStatistics(flowStatistics.build());
+                
+            sucLogger.info("Flow : {}",flowRule.toString());
+            sucLogger.info("Statistics to augment : {}",flowStatistics.build().toString());
+
+            InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
+                    .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
+            
+            Table table= (Table)it.readConfigurationData(tableRef);
+
+            //TODO: Not a good way to do it, need to figure out better way.
+            //TODO: major issue in any alternate approach is that flow key is incrementally assigned 
+            //to the flows stored in data store.
+            if(table != null){
+
+                for(Flow existingFlow : table.getFlow()){
+                    sucLogger.debug("Existing flow in data store : {}",existingFlow.toString());
+                    if(flowEquals(flowRule,existingFlow)){
+                        InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
+                                .augmentation(FlowCapableNode.class)
+                                .child(Table.class, new TableKey(tableId))
+                                .child(Flow.class,existingFlow.getKey()).toInstance();
+                        flowBuilder.setKey(existingFlow.getKey());
+                        flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+                        sucLogger.debug("Found matching flow in the datastore, augmenting statistics");
+                        foundOriginalFlow = true;
+                        it.putOperationalData(flowRef, flowBuilder.build());
+                        it.commit();
+                        break;
+                    }
+                }
+            }
+            
+            if(!foundOriginalFlow){
+                sucLogger.info("Associated original flow is not found in data store. Augmenting flow in operational data st");
+                //TODO: Temporary fix: format [ 0+tableid+0+unaccounted flow counter]
+                long flowKey = Long.getLong(new String("0"+Short.toString(tableId)+"0"+Integer.toString(this.unaccountedFlowsCounter)));
+                FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
+                InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
+                        .augmentation(FlowCapableNode.class)
+                        .child(Table.class, new TableKey(tableId))
+                        .child(Flow.class,newFlowKey).toInstance();
+                flowBuilder.setKey(newFlowKey);
+                flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+                sucLogger.debug("Flow was no present in data store, augmenting statistics as an unaccounted flow");
+                it.putOperationalData(flowRef, flowBuilder.build());
+                it.commit();
+            }
+        }
+    }
+
+    @Override
+    public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
+        NodeKey key = new NodeKey(notification.getId());
+        sucLogger.info("Received aggregate flow statistics update : {}",notification.toString());
+        
+        Short tableId = this.statisticsManager.getMultipartMessageManager().getTableIdForTxId(notification.getTransactionId());
+        if(tableId != null){
+            
+            DataModificationTransaction it = this.statisticsManager.startChange();
+
+            InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
+                    .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
+
+            AggregateFlowStatisticsDataBuilder aggregateFlowStatisticsDataBuilder = new AggregateFlowStatisticsDataBuilder();
+            AggregateFlowStatisticsBuilder aggregateFlowStatisticsBuilder = new AggregateFlowStatisticsBuilder();
+            aggregateFlowStatisticsBuilder.setByteCount(notification.getByteCount());
+            aggregateFlowStatisticsBuilder.setFlowCount(notification.getFlowCount());
+            aggregateFlowStatisticsBuilder.setPacketCount(notification.getPacketCount());
+            aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
+            
+            ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+            if(!cache.containsKey(notification.getId())){
+                cache.put(notification.getId(), new NodeStatistics());
+            }
+            cache.get(notification.getId()).getTableAndAggregateFlowStatsMap().put(tableId,aggregateFlowStatisticsBuilder.build());
+            
+            sucLogger.info("Augment aggregate statistics: {} for table {} on Node {}",aggregateFlowStatisticsBuilder.build().toString(),tableId,key);
+
+            TableBuilder tableBuilder = new TableBuilder();
+            tableBuilder.setKey(new TableKey(tableId));
+            tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
+            it.putOperationalData(tableRef, tableBuilder.build());
+            it.commit();
+
+        }
+    }
+
+    @Override
+    public void onFlowStatisticsUpdated(FlowStatisticsUpdated notification) {
+        // TODO Auto-generated method stub
+        //TODO: Depricated, will clean it up once sal-compatibility is fixed.
+        //Sal-Compatibility code usage this notification event.
+        
+    }
+
+    @Override
+    public void onFlowTableStatisticsUpdated(FlowTableStatisticsUpdated notification) {
+        // TODO Auto-generated method stub
+        //TODO: Need to implement it yet
+        
+    }
+
+    @Override
+    public void onNodeConnectorStatisticsUpdated(NodeConnectorStatisticsUpdated notification) {
+        // TODO Auto-generated method stub
+        //TODO: Need to implement it yet
+        
+    }
+
+
+
     private NodeRef getNodeRef(NodeKey nodeKey){
         InstanceIdentifierBuilder<?> builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey);
         return new NodeRef(builder.toInstance());
     }
     private NodeRef getNodeRef(NodeKey nodeKey){
         InstanceIdentifierBuilder<?> builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey);
         return new NodeRef(builder.toInstance());
     }
+    
+    public boolean flowEquals(Flow statsFlow, Flow storedFlow) {
+        if (statsFlow.getClass() != storedFlow.getClass()) {
+            return false;
+        }
+        if (statsFlow.getBufferId()== null) {
+            if (storedFlow.getBufferId() != null) {
+                return false;
+            }
+        } else if(!statsFlow.getBufferId().equals(storedFlow.getBufferId())) {
+            return false;
+        }
+        if (statsFlow.getContainerName()== null) {
+            if (storedFlow.getContainerName()!= null) {
+                return false;
+            }
+        } else if(!statsFlow.getContainerName().equals(storedFlow.getContainerName())) {
+            return false;
+        }
+        if (statsFlow.getCookie()== null) {
+            if (storedFlow.getCookie()!= null) {
+                return false;
+            }
+        } else if(!statsFlow.getCookie().equals(storedFlow.getCookie())) {
+            return false;
+        }
+        if (statsFlow.getMatch()== null) {
+            if (storedFlow.getMatch() != null) {
+                return false;
+            }
+        } else if(!statsFlow.getMatch().equals(storedFlow.getMatch())) {
+            return false;
+        }
+        if (statsFlow.getCookie()== null) {
+            if (storedFlow.getCookie()!= null) {
+                return false;
+            }
+        } else if(!statsFlow.getCookie().equals(storedFlow.getCookie())) {
+            return false;
+        }
+        if (statsFlow.getHardTimeout() == null) {
+            if (storedFlow.getHardTimeout() != null) {
+                return false;
+            }
+        } else if(!statsFlow.getHardTimeout().equals(storedFlow.getHardTimeout() )) {
+            return false;
+        }
+        if (statsFlow.getIdleTimeout()== null) {
+            if (storedFlow.getIdleTimeout() != null) {
+                return false;
+            }
+        } else if(!statsFlow.getIdleTimeout().equals(storedFlow.getIdleTimeout())) {
+            return false;
+        }
+        if (statsFlow.getPriority() == null) {
+            if (storedFlow.getPriority() != null) {
+                return false;
+            }
+        } else if(!statsFlow.getPriority().equals(storedFlow.getPriority())) {
+            return false;
+        }
+        if (statsFlow.getTableId() == null) {
+            if (storedFlow.getTableId() != null) {
+                return false;
+            }
+        } else if(!statsFlow.getTableId().equals(storedFlow.getTableId())) {
+            return false;
+        }
+        return true;
+    }
 
 }
 
 }