Gerrit contains following changes : 83/4583/2
authorAnil Vishnoi <avishnoi@in.ibm.com>
Tue, 21 Jan 2014 10:34:25 +0000 (16:04 +0530)
committerEd Warnicke <eaw@cisco.com>
Thu, 23 Jan 2014 02:37:52 +0000 (02:37 +0000)
1) Fix for bug 284. Added functionality that listen to data store *remove* changes and
cleanup the relevant statistics from operational data store.
2) Added functionality that listen to config data store *create* changes for Flow,Group,
Meter & Queue and send statistics request to switch to get respective stats.
3) Added functionality to periodically remove stale stats from operational data store.
Clean up thread invokes after every two cycle of stats collection.
4) Removed unnecessary local caching.

Change-Id: Ibee3c73905ce872302c4f54ce5b7b53c0657ee51
Signed-off-by: Anil Vishnoi <avishnoi@in.ibm.com>
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatistics.java [deleted file]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManagerActivator.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatistics.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatistics.java
deleted file mode 100644 (file)
index e84b437..0000000
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Copyright IBM Corporation, 2013.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.statistics.manager;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericQueueStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericTableStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.NodeConnectorStatistics;
-
-public class NodeStatistics {
-
-    private NodeRef targetNode;
-    
-    private List<GroupStats> groupStatistics;
-    
-    private List<MeterStats> meterStatistics;
-    
-    private List<GroupDescStats> groupDescStats;
-    
-    private List<MeterConfigStats> meterConfigStats;
-    
-    private GroupFeatures groupFeatures;
-    
-    private MeterFeatures meterFeatures;
-    
-    private final Map<Short,Map<Flow,GenericStatistics>> flowAndStatsMap= 
-            new HashMap<Short,Map<Flow,GenericStatistics>>();
-    
-    private final Map<Short,AggregateFlowStatistics> tableAndAggregateFlowStatsMap = 
-            new HashMap<Short,AggregateFlowStatistics>();
-    
-    private final Map<NodeConnectorId,NodeConnectorStatistics> nodeConnectorStats = 
-            new ConcurrentHashMap<NodeConnectorId,NodeConnectorStatistics>();
-    
-    private final Map<Short,GenericTableStatistics> flowTableAndStatisticsMap = 
-            new HashMap<Short,GenericTableStatistics>();
-    
-    private final Map<NodeConnectorId,Map<QueueId,GenericQueueStatistics>> NodeConnectorAndQueuesStatsMap = 
-            new HashMap<NodeConnectorId,Map<QueueId,GenericQueueStatistics>>();
-    
-    public NodeStatistics(){
-        
-    }
-
-    public NodeRef getTargetNode() {
-        return targetNode;
-    }
-
-    public void setTargetNode(NodeRef targetNode) {
-        this.targetNode = targetNode;
-    }
-
-    public List<GroupStats> getGroupStatistics() {
-        return groupStatistics;
-    }
-
-    public void setGroupStatistics(List<GroupStats> groupStatistics) {
-        this.groupStatistics = groupStatistics;
-    }
-
-    public List<MeterStats> getMeterStatistics() {
-        return meterStatistics;
-    }
-
-    public void setMeterStatistics(List<MeterStats> meterStatistics) {
-        this.meterStatistics = meterStatistics;
-    }
-
-    public List<GroupDescStats> getGroupDescStats() {
-        return groupDescStats;
-    }
-
-    public void setGroupDescStats(List<GroupDescStats> groupDescStats) {
-        this.groupDescStats = groupDescStats;
-    }
-
-    public List<MeterConfigStats> getMeterConfigStats() {
-        return meterConfigStats;
-    }
-
-    public void setMeterConfigStats(List<MeterConfigStats> meterConfigStats) {
-        this.meterConfigStats = meterConfigStats;
-    }
-
-    public GroupFeatures getGroupFeatures() {
-        return groupFeatures;
-    }
-
-    public void setGroupFeatures(GroupFeatures groupFeatures) {
-        this.groupFeatures = groupFeatures;
-    }
-
-    public MeterFeatures getMeterFeatures() {
-        return meterFeatures;
-    }
-
-    public void setMeterFeatures(MeterFeatures meterFeatures) {
-        this.meterFeatures = meterFeatures;
-    }
-
-    public Map<Short,Map<Flow,GenericStatistics>> getFlowAndStatsMap() {
-        return flowAndStatsMap;
-    }
-
-    public Map<Short, GenericTableStatistics> getFlowTableAndStatisticsMap() {
-        return flowTableAndStatisticsMap;
-    }
-
-    public Map<Short, AggregateFlowStatistics> getTableAndAggregateFlowStatsMap() {
-        return tableAndAggregateFlowStatsMap;
-    }
-    public Map<NodeConnectorId, NodeConnectorStatistics> getNodeConnectorStats() {
-        return nodeConnectorStats;
-    }
-
-    public Map<NodeConnectorId, Map<QueueId, GenericQueueStatistics>> getNodeConnectorAndQueuesStatsMap() {
-        return NodeConnectorAndQueuesStatsMap;
-    }
-}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java
new file mode 100644 (file)
index 0000000..4ecd620
--- /dev/null
@@ -0,0 +1,346 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
+
+/**
+ * Main responsibility of this class to clean up all the stale statistics data
+ * associated to Flow,Meter,Group,Queue.
+ * @author avishnoi@in.ibm.com
+ *
+ */
+public class NodeStatisticsAger {
+    
+    private final int NUMBER_OF_WAIT_CYCLES =2;
+
+    private final StatisticsProvider statisticsProvider;
+
+    private final NodeKey targetNodeKey;
+    
+    private final Map<GroupDescStats,Date> groupDescStatsUpdate
+                = new ConcurrentHashMap<GroupDescStats,Date>();
+    
+    private final Map<MeterConfigStats,Date> meterConfigStatsUpdate
+                = new ConcurrentHashMap<MeterConfigStats,Date>();
+    
+    private final Map<FlowEntry,Date> flowStatsUpdate
+                = new ConcurrentHashMap<FlowEntry,Date>();
+    
+    private final Map<QueueEntry,Date> queuesStatsUpdate 
+                = new ConcurrentHashMap<QueueEntry,Date>();
+    
+    public NodeStatisticsAger(StatisticsProvider statisticsProvider, NodeKey nodeKey){
+        this.targetNodeKey = nodeKey;
+        this.statisticsProvider = statisticsProvider;
+    }
+
+    public class FlowEntry{
+        private final Short tableId;
+        private final Flow flow;
+        
+        public FlowEntry(Short tableId, Flow flow){
+            this.tableId = tableId;
+            this.flow = flow;
+        }
+
+        public Short getTableId() {
+            return tableId;
+        }
+
+        public Flow getFlow() {
+            return flow;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + getOuterType().hashCode();
+            result = prime * result + ((flow == null) ? 0 : flow.hashCode());
+            result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            FlowEntry other = (FlowEntry) obj;
+            if (!getOuterType().equals(other.getOuterType()))
+                return false;
+            if (flow == null) {
+                if (other.flow != null)
+                    return false;
+            } else if (!flow.equals(other.flow))
+                return false;
+            if (tableId == null) {
+                if (other.tableId != null)
+                    return false;
+            } else if (!tableId.equals(other.tableId))
+                return false;
+            return true;
+        }
+
+        private NodeStatisticsAger getOuterType() {
+            return NodeStatisticsAger.this;
+        }
+        
+    }
+    
+    public class QueueEntry{
+        private final NodeConnectorId nodeConnectorId;
+        private final QueueId queueId;
+        public QueueEntry(NodeConnectorId ncId, QueueId queueId){
+            this.nodeConnectorId = ncId;
+            this.queueId = queueId;
+        }
+        public NodeConnectorId getNodeConnectorId() {
+            return nodeConnectorId;
+        }
+        public QueueId getQueueId() {
+            return queueId;
+        }
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + getOuterType().hashCode();
+            result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode());
+            result = prime * result + ((queueId == null) ? 0 : queueId.hashCode());
+            return result;
+        }
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (!(obj instanceof QueueEntry)) {
+                return false;
+            }
+            QueueEntry other = (QueueEntry) obj;
+            if (!getOuterType().equals(other.getOuterType())) {
+                return false;
+            }
+            if (nodeConnectorId == null) {
+                if (other.nodeConnectorId != null) {
+                    return false;
+                }
+            } else if (!nodeConnectorId.equals(other.nodeConnectorId)) {
+                return false;
+            }
+            if (queueId == null) {
+                if (other.queueId != null) {
+                    return false;
+                }
+            } else if (!queueId.equals(other.queueId)) {
+                return false;
+            }
+            return true;
+        }
+        private NodeStatisticsAger getOuterType() {
+            return NodeStatisticsAger.this;
+        }
+    }
+    
+    public NodeKey getTargetNodeKey() {
+        return targetNodeKey;
+    }
+
+    public Map<GroupDescStats, Date> getGroupDescStatsUpdate() {
+        return groupDescStatsUpdate;
+    }
+
+    public Map<MeterConfigStats, Date> getMeterConfigStatsUpdate() {
+        return meterConfigStatsUpdate;
+    }
+
+    public Map<FlowEntry, Date> getFlowStatsUpdate() {
+        return flowStatsUpdate;
+    }
+
+    public Map<QueueEntry, Date> getQueuesStatsUpdate() {
+        return queuesStatsUpdate;
+    }
+
+    public void updateGroupDescStats(List<GroupDescStats> list){
+        Date expiryTime = getExpiryTime();
+        for(GroupDescStats groupDescStats : list)
+            this.groupDescStatsUpdate.put(groupDescStats, expiryTime);
+    }
+    
+    public void updateMeterConfigStats(List<MeterConfigStats> list){
+        Date expiryTime = getExpiryTime();
+        for(MeterConfigStats meterConfigStats: list)
+            this.meterConfigStatsUpdate.put(meterConfigStats, expiryTime);
+    }
+    
+    public void  updateFlowStats(FlowEntry flowEntry){
+        this.flowStatsUpdate.put(flowEntry, getExpiryTime());
+    }
+    public void updateQueueStats(QueueEntry queueEntry){
+        this.queuesStatsUpdate.put(queueEntry, getExpiryTime());
+    }
+    
+    private Date getExpiryTime(){
+        Date expires = new Date();
+        expires.setTime(expires.getTime()+StatisticsProvider.STATS_THREAD_EXECUTION_TIME*NUMBER_OF_WAIT_CYCLES);
+        return expires;
+    }
+    
+    public void cleanStaleStatistics(){
+        //Clean stale statistics related to group 
+        for (Iterator<GroupDescStats> it = this.groupDescStatsUpdate.keySet().iterator();it.hasNext();){
+            GroupDescStats groupDescStats = it.next();
+            Date now = new Date();
+            Date expiryTime = this.groupDescStatsUpdate.get(groupDescStats);
+            if(now.after(expiryTime)){
+                cleanGroupStatsFromDataStore(groupDescStats );
+                it.remove();
+            }
+        }
+        
+        //Clean stale statistics related to meter 
+        for (Iterator<MeterConfigStats> it = this.meterConfigStatsUpdate.keySet().iterator();it.hasNext();){
+            MeterConfigStats meterConfigStats = it.next();
+            Date now = new Date();
+            Date expiryTime = this.meterConfigStatsUpdate.get(meterConfigStats);
+            if(now.after(expiryTime)){
+                cleanMeterStatsFromDataStore(meterConfigStats);
+                it.remove();
+            }            
+        }
+        
+        //Clean stale statistics related to flow 
+        for (Iterator<FlowEntry> it = this.flowStatsUpdate.keySet().iterator();it.hasNext();){
+            FlowEntry flowEntry = it.next();
+            Date now = new Date();
+            Date expiryTime = this.flowStatsUpdate.get(flowEntry);
+            if(now.after(expiryTime)){
+                cleanFlowStatsFromDataStore(flowEntry);
+                it.remove();
+            }            
+        }
+
+        //Clean stale statistics related to queue
+        for (Iterator<QueueEntry> it = this.queuesStatsUpdate.keySet().iterator();it.hasNext();){
+            QueueEntry queueEntry = it.next();
+            Date now = new Date();
+            Date expiryTime = this.queuesStatsUpdate.get(queueEntry);
+            if(now.after(expiryTime)){
+                cleanQueueStatsFromDataStore(queueEntry);
+                it.remove();
+            }            
+        }
+        
+    }
+
+    private void cleanQueueStatsFromDataStore(QueueEntry queueEntry) {
+        InstanceIdentifier<?> queueRef 
+                        = InstanceIdentifier.builder(Nodes.class)
+                                            .child(Node.class, this.targetNodeKey)
+                                            .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
+                                            .augmentation(FlowCapableNodeConnector.class)
+                                            .child(Queue.class, new QueueKey(queueEntry.getQueueId()))
+                                            .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
+        cleanStaleStatisticsFromDataStore(queueRef);
+    }
+
+    private void cleanFlowStatsFromDataStore(FlowEntry flowEntry) {
+        InstanceIdentifier<?> flowRef 
+                        = InstanceIdentifier.builder(Nodes.class).child(Node.class, this.targetNodeKey)
+                                            .augmentation(FlowCapableNode.class)
+                                            .child(Table.class, new TableKey(flowEntry.getTableId()))
+                                            .child(Flow.class,flowEntry.getFlow().getKey())
+                                            .augmentation(FlowStatisticsData.class).toInstance();
+        
+        cleanStaleStatisticsFromDataStore(flowRef);
+        
+    }
+
+    private void cleanMeterStatsFromDataStore(MeterConfigStats meterConfigStats) {
+        InstanceIdentifierBuilder<Meter> meterRef 
+                        = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
+                                            .augmentation(FlowCapableNode.class)
+                                            .child(Meter.class,new MeterKey(meterConfigStats.getMeterId()));
+        
+        InstanceIdentifier<?> nodeMeterConfigStatsAugmentation = meterRef.augmentation(NodeMeterConfigStats.class).toInstance();
+                                            
+        cleanStaleStatisticsFromDataStore(nodeMeterConfigStatsAugmentation);
+        
+        InstanceIdentifier<?> nodeMeterStatisticsAugmentation = meterRef.augmentation(NodeMeterStatistics.class).toInstance();
+        
+        cleanStaleStatisticsFromDataStore(nodeMeterStatisticsAugmentation);
+        
+    }
+
+    private void cleanGroupStatsFromDataStore(GroupDescStats groupDescStats) {
+        InstanceIdentifierBuilder<Group> groupRef 
+                        = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
+                                            .augmentation(FlowCapableNode.class)
+                                            .child(Group.class,new GroupKey(groupDescStats.getGroupId()));
+        
+        InstanceIdentifier<?> nodeGroupDescStatsAugmentation = groupRef.augmentation(NodeGroupDescStats.class).toInstance();
+        
+        cleanStaleStatisticsFromDataStore(nodeGroupDescStatsAugmentation);
+
+        InstanceIdentifier<?> nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance();
+        
+        cleanStaleStatisticsFromDataStore(nodeGroupStatisticsAugmentation);
+    }
+    
+    private void cleanStaleStatisticsFromDataStore(InstanceIdentifier<? extends DataObject> ii){
+        if(ii != null){
+            DataModificationTransaction it = this.statisticsProvider.startChange();
+            it.removeOperationalData(ii);
+            it.commit();
+        }
+    }
+}
index 738c2cb9a8f6375d8681a5e824b0d2f953d1a211..653cc8081ab8a953463760560b54cf1bfd197894 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.md.statistics.manager;
 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.osgi.framework.BundleContext;
 
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.osgi.framework.BundleContext;
 
@@ -26,6 +27,8 @@ public class StatisticsManagerActivator extends AbstractBindingAwareProvider {
         pSession = session;
         DataProviderService dps = session.<DataProviderService>getSALService(DataProviderService.class);
         StatisticsManagerActivator.statsProvider.setDataService(dps);
         pSession = session;
         DataProviderService dps = session.<DataProviderService>getSALService(DataProviderService.class);
         StatisticsManagerActivator.statsProvider.setDataService(dps);
+        DataBrokerService dbs = session.<DataBrokerService>getSALService(DataBrokerService.class);
+        StatisticsManagerActivator.statsProvider.setDataBrokerService(dbs);
         NotificationProviderService nps = session.<NotificationProviderService>getSALService(NotificationProviderService.class);
         StatisticsManagerActivator.statsProvider.setNotificationService(nps);
         StatisticsManagerActivator.statsProvider.start();
         NotificationProviderService nps = session.<NotificationProviderService>getSALService(NotificationProviderService.class);
         StatisticsManagerActivator.statsProvider.setNotificationService(nps);
         StatisticsManagerActivator.statsProvider.start();
index 4eaad427381cac032d6e6b48de81d2da0f5e3445..7b7403f1c327f6492d81e0a1980510025b46e37e 100644 (file)
@@ -17,26 +17,37 @@ import java.util.concurrent.Future;
 import org.eclipse.xtext.xbase.lib.Exceptions;
 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.eclipse.xtext.xbase.lib.Exceptions;
 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
@@ -49,28 +60,45 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.G
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
 import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/** 
+ * Following are main responsibilities of the class:
+ * 1) Invoke statistics request thread to send periodic statistics request to all the 
+ * flow capable switch connected to the controller. It sends statistics request for 
+ * Group,Meter,Table,Flow,Queue,Aggregate stats.   
+ * 
+ * 2) Invoke statistics ager thread, to clean up all the stale statistics data from 
+ * operational data store.
+ * 
+ * @author avishnoi@in.ibm.com
+ *
+ */
 public class StatisticsProvider implements AutoCloseable {
 
     public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
 public class StatisticsProvider implements AutoCloseable {
 
     public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
-
+    
     private DataProviderService dps;
     private DataProviderService dps;
+    
+    private DataBrokerService dbs;
 
     private NotificationProviderService nps;
 
     private NotificationProviderService nps;
-
+    
     private OpendaylightGroupStatisticsService groupStatsService;
     private OpendaylightGroupStatisticsService groupStatsService;
-
+    
     private OpendaylightMeterStatisticsService meterStatsService;
     private OpendaylightMeterStatisticsService meterStatsService;
-
+    
     private OpendaylightFlowStatisticsService flowStatsService;
     private OpendaylightFlowStatisticsService flowStatsService;
-
+    
     private OpendaylightPortStatisticsService portStatsService;
 
     private OpendaylightFlowTableStatisticsService flowTableStatsService;
     private OpendaylightPortStatisticsService portStatsService;
 
     private OpendaylightFlowTableStatisticsService flowTableStatsService;
@@ -78,29 +106,41 @@ public class StatisticsProvider implements AutoCloseable {
     private OpendaylightQueueStatisticsService queueStatsService;
 
     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
     private OpendaylightQueueStatisticsService queueStatsService;
 
     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
-
+    
+    private StatisticsUpdateHandler statsUpdateHandler;
+    
     private Thread statisticsRequesterThread;
     private Thread statisticsRequesterThread;
+    
+    private Thread statisticsAgerThread;
 
     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
 
     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
-
-    private final int STATS_THREAD_EXECUTION_TIME= 50000;
+    
+    public static final int STATS_THREAD_EXECUTION_TIME= 30000;
     //Local caching of stats
     //Local caching of stats
-
-    private final ConcurrentMap<NodeId,NodeStatistics> statisticsCache =
-            new ConcurrentHashMap<NodeId,NodeStatistics>();
-
+    
+    private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache = 
+            new ConcurrentHashMap<NodeId,NodeStatisticsAger>();
+    
     public DataProviderService getDataService() {
       return this.dps;
     }
     public DataProviderService getDataService() {
       return this.dps;
     }
-
+    
     public void setDataService(final DataProviderService dataService) {
       this.dps = dataService;
     }
     public void setDataService(final DataProviderService dataService) {
       this.dps = dataService;
     }
+    
+    public DataBrokerService getDataBrokerService() {
+        return this.dbs;
+    }
+      
+    public void setDataBrokerService(final DataBrokerService dataBrokerService) {
+        this.dbs = dataBrokerService;
+    }
 
     public NotificationProviderService getNotificationService() {
       return this.nps;
     }
 
     public NotificationProviderService getNotificationService() {
       return this.nps;
     }
-
+    
     public void setNotificationService(final NotificationProviderService notificationService) {
       this.nps = notificationService;
     }
     public void setNotificationService(final NotificationProviderService notificationService) {
       this.nps = notificationService;
     }
@@ -110,22 +150,26 @@ public class StatisticsProvider implements AutoCloseable {
     }
 
     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
     }
 
     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
-
+    
     private Registration<NotificationListener> listenerRegistration;
     private Registration<NotificationListener> listenerRegistration;
-
+    
     public void start() {
     public void start() {
-
+        
         NotificationProviderService nps = this.getNotificationService();
         Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
         this.listenerRegistration = registerNotificationListener;
         NotificationProviderService nps = this.getNotificationService();
         Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
         this.listenerRegistration = registerNotificationListener;
-
+        
+        statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
+        
+        registerDataStoreUpdateListener(this.getDataBrokerService());
+        
         // Get Group/Meter statistics service instance
         groupStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightGroupStatisticsService.class);
         // Get Group/Meter statistics service instance
         groupStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightGroupStatisticsService.class);
-
+        
         meterStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightMeterStatisticsService.class);
         meterStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightMeterStatisticsService.class);
-
+        
         flowStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightFlowStatisticsService.class);
 
         flowStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightFlowStatisticsService.class);
 
@@ -134,10 +178,10 @@ public class StatisticsProvider implements AutoCloseable {
 
         flowTableStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightFlowTableStatisticsService.class);
 
         flowTableStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightFlowTableStatisticsService.class);
-
+        
         queueStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightQueueStatisticsService.class);
         queueStatsService = StatisticsManagerActivator.getProviderContext().
                 getRpcService(OpendaylightQueueStatisticsService.class);
-
+        
         statisticsRequesterThread = new Thread( new Runnable(){
 
             @Override
         statisticsRequesterThread = new Thread( new Runnable(){
 
             @Override
@@ -145,7 +189,7 @@ public class StatisticsProvider implements AutoCloseable {
                 while(true){
                     try {
                         statsRequestSender();
                 while(true){
                     try {
                         statsRequestSender();
-
+                        
                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
                     }catch (Exception e){
                         spLogger.error("Exception occurred while sending stats request : {}",e);
                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
                     }catch (Exception e){
                         spLogger.error("Exception occurred while sending stats request : {}",e);
@@ -153,55 +197,106 @@ public class StatisticsProvider implements AutoCloseable {
                 }
             }
         });
                 }
             }
         });
-
+        
         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
-
+        
         statisticsRequesterThread.start();
         statisticsRequesterThread.start();
+        
+        statisticsAgerThread = new Thread( new Runnable(){
+
+            @Override
+            public void run() {
+                while(true){
+                    try {
+                        for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
+                            nodeStatisticsAger.cleanStaleStatistics();
+                        }
+                        
+                        Thread.sleep(STATS_THREAD_EXECUTION_TIME);
+                    }catch (Exception e){
+                        spLogger.error("Exception occurred while sending stats request : {}",e);
+                    }
+                }
+            }
+        });
+        
+        spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
 
 
+        statisticsAgerThread.start();
+        
         spLogger.info("Statistics Provider started.");
     }
         spLogger.info("Statistics Provider started.");
     }
+    
+    private void registerDataStoreUpdateListener(DataBrokerService dbs) {
+        //Register for flow updates
+        InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
+                                                                    .augmentation(FlowCapableNode.class)
+                                                                    .child(Table.class)
+                                                                    .child(Flow.class).toInstance();
+        dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
+        
+        //Register for meter updates
+        InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
+                                                    .augmentation(FlowCapableNode.class)
+                                                    .child(Meter.class).toInstance();
+
+        dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
+        
+        //Register for group updates 
+        InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
+                                                    .augmentation(FlowCapableNode.class)
+                                                    .child(Group.class).toInstance();
+        dbs.registerDataChangeListener(pathGroup, statsUpdateHandler);
+
+        //Register for queue updates
+        InstanceIdentifier<? extends DataObject> pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class)
+                                                                    .child(NodeConnector.class)
+                                                                    .augmentation(FlowCapableNodeConnector.class)
+                                                                    .child(Queue.class).toInstance();
+        dbs.registerDataChangeListener(pathQueue, statsUpdateHandler);
+    }
 
     protected DataModificationTransaction startChange() {
 
     protected DataModificationTransaction startChange() {
-
+        
         DataProviderService dps = this.getDataService();
         return dps.beginTransaction();
     }
         DataProviderService dps = this.getDataService();
         return dps.beginTransaction();
     }
-
+    
     private void statsRequestSender(){
     private void statsRequestSender(){
-
+        
         List<Node> targetNodes = getAllConnectedNodes();
         List<Node> targetNodes = getAllConnectedNodes();
-
+        
         if(targetNodes == null)
             return;
         if(targetNodes == null)
             return;
-
+        
 
         for (Node targetNode : targetNodes){
 
         for (Node targetNode : targetNodes){
-
+            
             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
 
             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
 
-                spLogger.trace("Send request for stats collection to node : {})",targetNode.getId());
-
+                spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
+                
                 InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
                 InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-
+                
                 NodeRef targetNodeRef = new NodeRef(targetInstanceId);
                 NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-
+            
                 try{
                     sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
                 try{
                     sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
-
+                
                     sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
 
                     sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
                     sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
 
                     sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
-
+                
                     sendAllFlowTablesStatisticsRequest(targetNodeRef);
                     sendAllFlowTablesStatisticsRequest(targetNodeRef);
-
+                
                     sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
 
                     sendAllGroupStatisticsRequest(targetNodeRef);
                     sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
 
                     sendAllGroupStatisticsRequest(targetNodeRef);
-
+                    
                     sendAllMeterStatisticsRequest(targetNodeRef);
                     sendAllMeterStatisticsRequest(targetNodeRef);
-
+                    
                     sendGroupDescriptionRequest(targetNodeRef);
                     sendGroupDescriptionRequest(targetNodeRef);
-
+                    
                     sendMeterConfigStatisticsRequest(targetNodeRef);
                 }catch(Exception e){
                     spLogger.error("Exception occured while sending statistics requests : {}", e);
                     sendMeterConfigStatisticsRequest(targetNodeRef);
                 }catch(Exception e){
                     spLogger.error("Exception occured while sending statistics requests : {}", e);
@@ -210,13 +305,13 @@ public class StatisticsProvider implements AutoCloseable {
         }
     }
 
         }
     }
 
-    private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
-        final GetFlowTablesStatisticsInputBuilder input =
+    public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
+        final GetFlowTablesStatisticsInputBuilder input = 
                 new GetFlowTablesStatisticsInputBuilder();
                 new GetFlowTablesStatisticsInputBuilder();
-
+        
         input.setNode(targetNodeRef);
 
         input.setNode(targetNodeRef);
 
-        Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
+        Future<RpcResult<GetFlowTablesStatisticsOutput>> response = 
                 flowTableStatsService.getFlowTablesStatistics(input.build());
 
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 flowTableStatsService.getFlowTablesStatistics(input.build());
 
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
@@ -224,36 +319,51 @@ public class StatisticsProvider implements AutoCloseable {
 
     }
 
 
     }
 
-    private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
-
+        
         input.setNode(targetNode);
         input.setNode(targetNode);
-
-        Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
+        
+        Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = 
                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
-
+        
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW);
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW);
-
+        
+    }
+    
+    public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
+        final GetFlowStatisticsFromFlowTableInputBuilder input =
+                new GetFlowStatisticsFromFlowTableInputBuilder();
+        
+        input.setNode(targetNode);
+        input.fieldsFrom(flow);
+        
+        Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response = 
+                flowStatsService.getFlowStatisticsFromFlowTable(input.build());
+        
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+                , StatsRequestType.ALL_FLOW);
+        
     }
 
     }
 
-    private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
-
+    public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
+        
         List<Short> tablesId = getTablesFromNode(targetNodeKey);
         List<Short> tablesId = getTablesFromNode(targetNodeKey);
-
+        
         if(tablesId.size() != 0){
             for(Short id : tablesId){
         if(tablesId.size() != 0){
             for(Short id : tablesId){
-
-                spLogger.trace("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
-                GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
+                
+                spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
+                GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-
+                
                 input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
                 input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id));
                 input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
                 input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id));
-                Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
+                Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
                         flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
                         flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
-
+                
                 multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
                 this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                         , StatsRequestType.AGGR_FLOW);
                 multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
                 this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                         , StatsRequestType.AGGR_FLOW);
@@ -263,108 +373,122 @@ public class StatisticsProvider implements AutoCloseable {
         }
     }
 
         }
     }
 
-    private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+    public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+        
         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
         final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
-
+        
         input.setNode(targetNode);
 
         input.setNode(targetNode);
 
-        Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
+        Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response = 
                 portStatsService.getAllNodeConnectorsStatistics(input.build());
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_PORT);
 
     }
 
                 portStatsService.getAllNodeConnectorsStatistics(input.build());
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_PORT);
 
     }
 
-    private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+    public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+        
         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
-
+        
         input.setNode(targetNode);
 
         input.setNode(targetNode);
 
-        Future<RpcResult<GetAllGroupStatisticsOutput>> response =
+        Future<RpcResult<GetAllGroupStatisticsOutput>> response = 
                 groupStatsService.getAllGroupStatistics(input.build());
                 groupStatsService.getAllGroupStatistics(input.build());
-
+        
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_GROUP);
 
     }
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_GROUP);
 
     }
-
-    private void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+    
+    public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
-
+        
         input.setNode(targetNode);
 
         input.setNode(targetNode);
 
-        Future<RpcResult<GetGroupDescriptionOutput>> response =
+        Future<RpcResult<GetGroupDescriptionOutput>> response = 
                 groupStatsService.getGroupDescription(input.build());
 
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.GROUP_DESC);
 
     }
                 groupStatsService.getGroupDescription(input.build());
 
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.GROUP_DESC);
 
     }
-
-    private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+    
+    public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+        
         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
-
+        
         input.setNode(targetNode);
 
         input.setNode(targetNode);
 
-        Future<RpcResult<GetAllMeterStatisticsOutput>> response =
+        Future<RpcResult<GetAllMeterStatisticsOutput>> response = 
                 meterStatsService.getAllMeterStatistics(input.build());
                 meterStatsService.getAllMeterStatistics(input.build());
-
+        
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_METER);;
 
     }
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_METER);;
 
     }
-
-    private void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+    
+    public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+        
         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
-
+        
         input.setNode(targetNode);
 
         input.setNode(targetNode);
 
-        Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
+        Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response = 
                 meterStatsService.getAllMeterConfigStatistics(input.build());
                 meterStatsService.getAllMeterConfigStatistics(input.build());
-
+        
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.METER_CONFIG);;
 
     }
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.METER_CONFIG);;
 
     }
-
-    private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
+    
+    public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
         GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
-
+        
         input.setNode(targetNode);
         input.setNode(targetNode);
-
-        Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
+        
+        Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response = 
                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
+        
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+                , StatsRequestType.ALL_QUEUE_STATS);;
 
 
+    }
+
+    public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
+        GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
+        
+        input.setNode(targetNode);
+        input.setNodeConnectorId(nodeConnectorId);
+        input.setQueueId(queueId);
+        Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response = 
+                queueStatsService.getQueueStatisticsFromGivenPort(input.build());
+        
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_QUEUE_STATS);;
 
     }
 
         this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_QUEUE_STATS);;
 
     }
 
-    public ConcurrentMap<NodeId, NodeStatistics> getStatisticsCache() {
+    public ConcurrentMap<NodeId, NodeStatisticsAger> getStatisticsCache() {
         return statisticsCache;
     }
         return statisticsCache;
     }
-
+    
     private List<Node> getAllConnectedNodes(){
     private List<Node> getAllConnectedNodes(){
-
+        
         Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
         if(nodes == null)
             return null;
         Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
         if(nodes == null)
             return null;
-
-        spLogger.trace("Number of connected nodes : {}",nodes.getNode().size());
+        
+        spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
         return nodes.getNode();
     }
         return nodes.getNode();
     }
-
+    
     private List<Short> getTablesFromNode(NodeKey nodeKey){
         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
     private List<Short> getTablesFromNode(NodeKey nodeKey){
         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
-
+        
         FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
         List<Short> tablesId = new ArrayList<Short>();
         if(node != null && node.getTable()!=null){
         FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
         List<Short> tablesId = new ArrayList<Short>();
         if(node != null && node.getTable()!=null){
-            spLogger.trace("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
+            spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
             for(Table table: node.getTable()){
                 tablesId.add(table.getId());
             }
             for(Table table: node.getTable()){
                 tablesId.add(table.getId());
             }
@@ -375,15 +499,17 @@ public class StatisticsProvider implements AutoCloseable {
     @SuppressWarnings("deprecation")
     @Override
     public void close(){
     @SuppressWarnings("deprecation")
     @Override
     public void close(){
-
+        
         try {
         try {
-            spLogger.trace("Statistics Provider stopped.");
+            spLogger.info("Statistics Provider stopped.");
             if (this.listenerRegistration != null) {
             if (this.listenerRegistration != null) {
-
+              
                 this.listenerRegistration.close();
                 this.listenerRegistration.close();
-
+                
                 this.statisticsRequesterThread.destroy();
                 this.statisticsRequesterThread.destroy();
-
+                
+                this.statisticsAgerThread.destroy();
+            
             }
           } catch (Throwable e) {
             throw Exceptions.sneakyThrow(e);
             }
           } catch (Throwable e) {
             throw Exceptions.sneakyThrow(e);
index 5743865d39539cbf8fb32404abf28c5b9194faa0..ace547a03c9f5d17d28764b0ffe9a53a227e1612 100644 (file)
@@ -7,10 +7,11 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
  */
 package org.opendaylight.controller.md.statistics.manager;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
+import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.FlowEntry;
+import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.QueueEntry;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
@@ -44,7 +45,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
@@ -88,7 +88,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericQueueStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
@@ -108,68 +107,68 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
 import org.slf4j.LoggerFactory;
 
 /**
- * Class implement statistics manager related listener interface and augment all the
+ * Class implement statistics manager related listener interface and augment all the 
  * received statistics data to data stores.
  * received statistics data to data stores.
- * TODO: Need to add error message listener and clean-up the associated tx id
+ * TODO: Need to add error message listener and clean-up the associated tx id 
  * if it exists in the tx-id cache.
  * @author vishnoianil
  *
  */
 public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener,
  * if it exists in the tx-id cache.
  * @author vishnoianil
  *
  */
 public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener,
-        OpendaylightMeterStatisticsListener,
+        OpendaylightMeterStatisticsListener, 
         OpendaylightFlowStatisticsListener,
         OpendaylightPortStatisticsListener,
         OpendaylightFlowTableStatisticsListener,
         OpendaylightQueueStatisticsListener{
         OpendaylightFlowStatisticsListener,
         OpendaylightPortStatisticsListener,
         OpendaylightFlowTableStatisticsListener,
         OpendaylightQueueStatisticsListener{
-
+    
     public final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
 
     private final StatisticsProvider statisticsManager;
     public final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
 
     private final StatisticsProvider statisticsManager;
-
-    private final int unaccountedFlowsCounter = 1;
+    
+    private int unaccountedFlowsCounter = 1;
 
     public StatisticsUpdateCommiter(final StatisticsProvider manager){
 
         this.statisticsManager = manager;
     }
 
     public StatisticsUpdateCommiter(final StatisticsProvider manager){
 
         this.statisticsManager = manager;
     }
-
+    
     public StatisticsProvider getStatisticsManager(){
         return statisticsManager;
     }
     public StatisticsProvider getStatisticsManager(){
         return statisticsManager;
     }
-
+   
     @Override
     public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) {
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
     @Override
     public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) {
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
+        
+        NodeKey key = new NodeKey(notification.getId());
 
         //Add statistics to local cache
 
         //Add statistics to local cache
-        ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+        ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
         if(!cache.containsKey(notification.getId())){
         if(!cache.containsKey(notification.getId())){
-            cache.put(notification.getId(), new NodeStatistics());
+            cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
         }
         }
-        cache.get(notification.getId()).setMeterConfigStats(notification.getMeterConfigStats());
-
+        cache.get(notification.getId()).updateMeterConfigStats(notification.getMeterConfigStats());
+        
         //Publish data to configuration data store
         //Publish data to configuration data store
-        NodeKey key = new NodeKey(notification.getId());
-
-        List<MeterConfigStats> eterConfigStatsList = notification.getMeterConfigStats();
-
-        for(MeterConfigStats meterConfigStats : eterConfigStatsList){
+        List<MeterConfigStats> meterConfigStatsList = notification.getMeterConfigStats();
+        
+        for(MeterConfigStats meterConfigStats : meterConfigStatsList){
             DataModificationTransaction it = this.statisticsManager.startChange();
             MeterBuilder meterBuilder = new MeterBuilder();
             MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId());
             meterBuilder.setKey(meterKey);
             DataModificationTransaction it = this.statisticsManager.startChange();
             MeterBuilder meterBuilder = new MeterBuilder();
             MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId());
             meterBuilder.setKey(meterKey);
-
+            
             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
                                                                                         .augmentation(FlowCapableNode.class)
                                                                                         .child(Meter.class,meterKey).toInstance();
             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
                                                                                         .augmentation(FlowCapableNode.class)
                                                                                         .child(Meter.class,meterKey).toInstance();
-
+            
             NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
             MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
             stats.fieldsFrom(meterConfigStats);
             meterConfig.setMeterConfigStats(stats.build());
             NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
             MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
             stats.fieldsFrom(meterConfigStats);
             meterConfig.setMeterConfigStats(stats.build());
-
+            
             //Update augmented data
             meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
             it.putOperationalData(meterRef, meterBuilder.build());
             //Update augmented data
             meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
             it.putOperationalData(meterRef, meterBuilder.build());
@@ -180,22 +179,15 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
 
     @Override
     public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
 
     @Override
     public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
-
+        
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
-        //Add statistics to local cache
-        ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
-        if(!cache.containsKey(notification.getId())){
-            cache.put(notification.getId(), new NodeStatistics());
-        }
-        cache.get(notification.getId()).setMeterStatistics(notification.getMeterStats());
-
         NodeKey key = new NodeKey(notification.getId());
         NodeKey key = new NodeKey(notification.getId());
-
+        
         List<MeterStats> meterStatsList = notification.getMeterStats();
         List<MeterStats> meterStatsList = notification.getMeterStats();
-
+        
         for(MeterStats meterStats : meterStatsList){
 
             //Publish data to configuration data store
         for(MeterStats meterStats : meterStatsList){
 
             //Publish data to configuration data store
@@ -203,11 +195,11 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
             MeterBuilder meterBuilder = new MeterBuilder();
             MeterKey meterKey = new MeterKey(meterStats.getMeterId());
             meterBuilder.setKey(meterKey);
             MeterBuilder meterBuilder = new MeterBuilder();
             MeterKey meterKey = new MeterKey(meterStats.getMeterId());
             meterBuilder.setKey(meterKey);
-
+            
             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
                                                                                         .augmentation(FlowCapableNode.class)
                                                                                         .child(Meter.class,meterKey).toInstance();
             InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
                                                                                         .augmentation(FlowCapableNode.class)
                                                                                         .child(Meter.class,meterKey).toInstance();
-
+            
             NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
             MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
             stats.fieldsFrom(meterStats);
             NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
             MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
             stats.fieldsFrom(meterStats);
@@ -222,29 +214,30 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
 
     @Override
     public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
 
     @Override
     public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
-
+        
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
+        NodeKey key = new NodeKey(notification.getId());
+
         //Add statistics to local cache
         //Add statistics to local cache
-        ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+        ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
         if(!cache.containsKey(notification.getId())){
         if(!cache.containsKey(notification.getId())){
-            cache.put(notification.getId(), new NodeStatistics());
+            cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
         }
         }
-        cache.get(notification.getId()).setGroupDescStats(notification.getGroupDescStats());
-
+        cache.get(notification.getId()).updateGroupDescStats(notification.getGroupDescStats());
+        
         //Publish data to configuration data store
         //Publish data to configuration data store
-        NodeKey key = new NodeKey(notification.getId());
         List<GroupDescStats> groupDescStatsList = notification.getGroupDescStats();
 
         for(GroupDescStats groupDescStats : groupDescStatsList){
             DataModificationTransaction it = this.statisticsManager.startChange();
         List<GroupDescStats> groupDescStatsList = notification.getGroupDescStats();
 
         for(GroupDescStats groupDescStats : groupDescStatsList){
             DataModificationTransaction it = this.statisticsManager.startChange();
-
+            
             GroupBuilder groupBuilder = new GroupBuilder();
             GroupKey groupKey = new GroupKey(groupDescStats.getGroupId());
             groupBuilder.setKey(groupKey);
             GroupBuilder groupBuilder = new GroupBuilder();
             GroupKey groupKey = new GroupKey(groupDescStats.getGroupId());
             groupBuilder.setKey(groupKey);
-
+            
             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
                                                                                         .augmentation(FlowCapableNode.class)
                                                                                         .child(Group.class,groupKey).toInstance();
             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
                                                                                         .augmentation(FlowCapableNode.class)
                                                                                         .child(Group.class,groupKey).toInstance();
@@ -253,7 +246,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
             GroupDescBuilder stats = new GroupDescBuilder();
             stats.fieldsFrom(groupDescStats);
             groupDesc.setGroupDesc(stats.build());
             GroupDescBuilder stats = new GroupDescBuilder();
             stats.fieldsFrom(groupDescStats);
             groupDesc.setGroupDesc(stats.build());
-
+            
             //Update augmented data
             groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
 
             //Update augmented data
             groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
 
@@ -264,29 +257,22 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
 
     @Override
     public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
 
     @Override
     public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
-
+        
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
-        //Add statistics to local cache
-        ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
-        if(!cache.containsKey(notification.getId())){
-            cache.put(notification.getId(), new NodeStatistics());
-        }
-        cache.get(notification.getId()).setGroupStatistics(notification.getGroupStats());
-
         //Publish data to configuration data store
         NodeKey key = new NodeKey(notification.getId());
         List<GroupStats> groupStatsList = notification.getGroupStats();
 
         for(GroupStats groupStats : groupStatsList){
             DataModificationTransaction it = this.statisticsManager.startChange();
         //Publish data to configuration data store
         NodeKey key = new NodeKey(notification.getId());
         List<GroupStats> groupStatsList = notification.getGroupStats();
 
         for(GroupStats groupStats : groupStatsList){
             DataModificationTransaction it = this.statisticsManager.startChange();
-
+            
             GroupBuilder groupBuilder = new GroupBuilder();
             GroupKey groupKey = new GroupKey(groupStats.getGroupId());
             groupBuilder.setKey(groupKey);
             GroupBuilder groupBuilder = new GroupBuilder();
             GroupKey groupKey = new GroupKey(groupStats.getGroupId());
             groupBuilder.setKey(groupKey);
-
+            
             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
                                                                                         .augmentation(FlowCapableNode.class)
                                                                                         .child(Group.class,groupKey).toInstance();
             InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
                                                                                         .augmentation(FlowCapableNode.class)
                                                                                         .child(Group.class,groupKey).toInstance();
@@ -295,80 +281,66 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
             GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
             stats.fieldsFrom(groupStats);
             groupStatisticsBuilder.setGroupStatistics(stats.build());
             GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
             stats.fieldsFrom(groupStats);
             groupStatisticsBuilder.setGroupStatistics(stats.build());
-
+            
             //Update augmented data
             groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
             it.putOperationalData(groupRef, groupBuilder.build());
             it.commit();
         }
     }
             //Update augmented data
             groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
             it.putOperationalData(groupRef, groupBuilder.build());
             it.commit();
         }
     }
-
+    
     @Override
     public void onMeterFeaturesUpdated(MeterFeaturesUpdated notification) {
 
     @Override
     public void onMeterFeaturesUpdated(MeterFeaturesUpdated notification) {
 
-        //Add statistics to local cache
-        ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
-        if(!cache.containsKey(notification.getId())){
-            cache.put(notification.getId(), new NodeStatistics());
-        }
         MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder();
         meterFeature.setMeterBandSupported(notification.getMeterBandSupported());
         meterFeature.setMeterCapabilitiesSupported(notification.getMeterCapabilitiesSupported());
         meterFeature.setMaxBands(notification.getMaxBands());
         meterFeature.setMaxColor(notification.getMaxColor());
         meterFeature.setMaxMeter(notification.getMaxMeter());
         MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder();
         meterFeature.setMeterBandSupported(notification.getMeterBandSupported());
         meterFeature.setMeterCapabilitiesSupported(notification.getMeterCapabilitiesSupported());
         meterFeature.setMaxBands(notification.getMaxBands());
         meterFeature.setMaxColor(notification.getMaxColor());
         meterFeature.setMaxMeter(notification.getMaxMeter());
-
-        cache.get(notification.getId()).setMeterFeatures(meterFeature.build());
-
+        
         //Publish data to configuration data store
         DataModificationTransaction it = this.statisticsManager.startChange();
         NodeKey key = new NodeKey(notification.getId());
         NodeRef ref = getNodeRef(key);
         //Publish data to configuration data store
         DataModificationTransaction it = this.statisticsManager.startChange();
         NodeKey key = new NodeKey(notification.getId());
         NodeRef ref = getNodeRef(key);
-
-        final NodeBuilder nodeData = new NodeBuilder();
+        
+        final NodeBuilder nodeData = new NodeBuilder(); 
         nodeData.setKey(key);
         nodeData.setKey(key);
-
+        
         NodeMeterFeaturesBuilder nodeMeterFeatures= new NodeMeterFeaturesBuilder();
         nodeMeterFeatures.setMeterFeatures(meterFeature.build());
         NodeMeterFeaturesBuilder nodeMeterFeatures= new NodeMeterFeaturesBuilder();
         nodeMeterFeatures.setMeterFeatures(meterFeature.build());
-
+        
         //Update augmented data
         nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
         //Update augmented data
         nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
-
+        
         InstanceIdentifier<? extends Object> refValue = ref.getValue();
         it.putOperationalData(refValue, nodeData.build());
         it.commit();
     }
         InstanceIdentifier<? extends Object> refValue = ref.getValue();
         it.putOperationalData(refValue, nodeData.build());
         it.commit();
     }
-
+    
     @Override
     public void onGroupFeaturesUpdated(GroupFeaturesUpdated notification) {
 
     @Override
     public void onGroupFeaturesUpdated(GroupFeaturesUpdated notification) {
 
-        //Add statistics to local cache
-        ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
-        if(!cache.containsKey(notification.getId())){
-            cache.put(notification.getId(), new NodeStatistics());
-        }
-
         GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder();
         groupFeatures.setActions(notification.getActions());
         groupFeatures.setGroupCapabilitiesSupported(notification.getGroupCapabilitiesSupported());
         groupFeatures.setGroupTypesSupported(notification.getGroupTypesSupported());
         groupFeatures.setMaxGroups(notification.getMaxGroups());
         GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder();
         groupFeatures.setActions(notification.getActions());
         groupFeatures.setGroupCapabilitiesSupported(notification.getGroupCapabilitiesSupported());
         groupFeatures.setGroupTypesSupported(notification.getGroupTypesSupported());
         groupFeatures.setMaxGroups(notification.getMaxGroups());
-        cache.get(notification.getId()).setGroupFeatures(groupFeatures.build());
-
+        
         //Publish data to configuration data store
         DataModificationTransaction it = this.statisticsManager.startChange();
         NodeKey key = new NodeKey(notification.getId());
         NodeRef ref = getNodeRef(key);
         //Publish data to configuration data store
         DataModificationTransaction it = this.statisticsManager.startChange();
         NodeKey key = new NodeKey(notification.getId());
         NodeRef ref = getNodeRef(key);
-
-        final NodeBuilder nodeData = new NodeBuilder();
+        
+        final NodeBuilder nodeData = new NodeBuilder(); 
         nodeData.setKey(key);
         nodeData.setKey(key);
-
+        
         NodeGroupFeaturesBuilder nodeGroupFeatures= new NodeGroupFeaturesBuilder();
         nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
         NodeGroupFeaturesBuilder nodeGroupFeatures= new NodeGroupFeaturesBuilder();
         nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
-
+        
         //Update augmented data
         nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
         //Update augmented data
         nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
-
+        
         InstanceIdentifier<? extends Object> refValue = ref.getValue();
         it.putOperationalData(refValue, nodeData.build());
         it.commit();
         InstanceIdentifier<? extends Object> refValue = ref.getValue();
         it.putOperationalData(refValue, nodeData.build());
         it.commit();
@@ -376,17 +348,17 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
 
     @Override
     public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
 
     @Override
     public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
-
+        
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received flow stats update : {}",notification.toString());
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received flow stats update : {}",notification.toString());
-
+        
         for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){
             short tableId = map.getTableId();
         for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){
             short tableId = map.getTableId();
-
+            
             DataModificationTransaction it = this.statisticsManager.startChange();
 
             boolean foundOriginalFlow = false;
             DataModificationTransaction it = this.statisticsManager.startChange();
 
             boolean foundOriginalFlow = false;
@@ -416,26 +388,25 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
             flow.setPriority(map.getPriority());
             flow.setStrict(map.isStrict());
             flow.setTableId(tableId);
             flow.setPriority(map.getPriority());
             flow.setStrict(map.isStrict());
             flow.setTableId(tableId);
-
+                
             Flow flowRule = flow.build();
             Flow flowRule = flow.build();
-
+                
             FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
             stats.setByteCount(map.getByteCount());
             stats.setPacketCount(map.getPacketCount());
             stats.setDuration(map.getDuration());
             FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
             stats.setByteCount(map.getByteCount());
             stats.setPacketCount(map.getPacketCount());
             stats.setDuration(map.getDuration());
-
+                
             GenericStatistics flowStats = stats.build();
             GenericStatistics flowStats = stats.build();
-
+                
             //Add statistics to local cache
             //Add statistics to local cache
-            ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+            ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
             if(!cache.containsKey(notification.getId())){
             if(!cache.containsKey(notification.getId())){
-                cache.put(notification.getId(), new NodeStatistics());
+                cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
             }
             }
-            if(!cache.get(notification.getId()).getFlowAndStatsMap().containsKey(tableId)){
-                cache.get(notification.getId()).getFlowAndStatsMap().put(tableId, new HashMap<Flow,GenericStatistics>());
-            }
-            cache.get(notification.getId()).getFlowAndStatsMap().get(tableId).put(flowRule,flowStats);
-
+            NodeStatisticsAger nsa = cache.get(notification.getId());
+            FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flowRule);
+            cache.get(notification.getId()).updateFlowStats(flowStatsEntry);
+                
             //Augment the data to the flow node
 
             FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
             //Augment the data to the flow node
 
             FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
@@ -460,17 +431,17 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
             flowStatistics.setTableId(tableId);
 
             flowStatisticsData.setFlowStatistics(flowStatistics.build());
             flowStatistics.setTableId(tableId);
 
             flowStatisticsData.setFlowStatistics(flowStatistics.build());
-
+                
             sucLogger.debug("Flow : {}",flowRule.toString());
             sucLogger.debug("Statistics to augment : {}",flowStatistics.build().toString());
 
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
             sucLogger.debug("Flow : {}",flowRule.toString());
             sucLogger.debug("Statistics to augment : {}",flowStatistics.build().toString());
 
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
-
+            
             Table table= (Table)it.readConfigurationData(tableRef);
 
             //TODO: Not a good way to do it, need to figure out better way.
             Table table= (Table)it.readConfigurationData(tableRef);
 
             //TODO: Not a good way to do it, need to figure out better way.
-            //TODO: major issue in any alternate approach is that flow key is incrementally assigned
+            //TODO: major issue in any alternate approach is that flow key is incrementally assigned 
             //to the flows stored in data store.
             if(table != null){
 
             //to the flows stored in data store.
             if(table != null){
 
@@ -483,7 +454,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
                                 .child(Flow.class,existingFlow.getKey()).toInstance();
                         flowBuilder.setKey(existingFlow.getKey());
                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
                                 .child(Flow.class,existingFlow.getKey()).toInstance();
                         flowBuilder.setKey(existingFlow.getKey());
                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
-                        sucLogger.trace("Found matching flow in the datastore, augmenting statistics");
+                        sucLogger.info("Found matching flow in the datastore, augmenting statistics");
                         foundOriginalFlow = true;
                         it.putOperationalData(flowRef, flowBuilder.build());
                         it.commit();
                         foundOriginalFlow = true;
                         it.putOperationalData(flowRef, flowBuilder.build());
                         it.commit();
@@ -491,11 +462,36 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
                     }
                 }
             }
                     }
                 }
             }
+            
+            table= (Table)it.readOperationalData(tableRef);
+            if(!foundOriginalFlow && table != null){
 
 
+                for(Flow existingFlow : table.getFlow()){
+                    FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
+                    if(augmentedflowStatisticsData != null){
+                        FlowBuilder existingOperationalFlow = new FlowBuilder();
+                        existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
+                        sucLogger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
+                        if(flowEquals(flowRule,existingOperationalFlow.build())){
+                            InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
+                                    .augmentation(FlowCapableNode.class)
+                                    .child(Table.class, new TableKey(tableId))
+                                    .child(Flow.class,existingFlow.getKey()).toInstance();
+                            flowBuilder.setKey(existingFlow.getKey());
+                            flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+                            sucLogger.debug("Found matching flow in the operational datastore, augmenting statistics");
+                            foundOriginalFlow = true;
+                            it.putOperationalData(flowRef, flowBuilder.build());
+                            it.commit();
+                            break;
+                        }
+                    }
+                }
+            }
             if(!foundOriginalFlow){
                 sucLogger.debug("Associated original flow is not found in data store. Augmenting flow in operational data store");
             if(!foundOriginalFlow){
                 sucLogger.debug("Associated original flow is not found in data store. Augmenting flow in operational data store");
-                //TODO: Temporary fix: format [ 1+tableid+1+unaccounted flow counter]
-                long flowKey = Long.parseLong(new String("1"+Short.toString(tableId)+"1"+Integer.toString(this.unaccountedFlowsCounter)));
+                long flowKey = Long.parseLong(new String("1"+Short.toString(tableId)+"0"+Integer.toString(this.unaccountedFlowsCounter)));
+                this.unaccountedFlowsCounter++;
                 FlowKey newFlowKey = new FlowKey(new FlowId(Long.toString(flowKey)));
                 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
                         .augmentation(FlowCapableNode.class)
                 FlowKey newFlowKey = new FlowKey(new FlowId(Long.toString(flowKey)));
                 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
                         .augmentation(FlowCapableNode.class)
@@ -503,7 +499,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
                         .child(Flow.class,newFlowKey).toInstance();
                 flowBuilder.setKey(newFlowKey);
                 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
                         .child(Flow.class,newFlowKey).toInstance();
                 flowBuilder.setKey(newFlowKey);
                 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
-                sucLogger.trace("Flow was no present in data store, augmenting statistics as an unaccounted flow");
+                sucLogger.info("Flow was no present in data store, augmenting statistics as an unaccounted flow");
                 it.putOperationalData(flowRef, flowBuilder.build());
                 it.commit();
             }
                 it.putOperationalData(flowRef, flowBuilder.build());
                 it.commit();
             }
@@ -518,10 +514,10 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received aggregate flow statistics update : {}",notification.toString());
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received aggregate flow statistics update : {}",notification.toString());
-
+        
         Short tableId = this.statisticsManager.getMultipartMessageManager().getTableIdForTxId(notification.getTransactionId());
         if(tableId != null){
         Short tableId = this.statisticsManager.getMultipartMessageManager().getTableIdForTxId(notification.getTransactionId());
         if(tableId != null){
-
+            
             DataModificationTransaction it = this.statisticsManager.startChange();
 
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
             DataModificationTransaction it = this.statisticsManager.startChange();
 
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
@@ -533,13 +529,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
             aggregateFlowStatisticsBuilder.setFlowCount(notification.getFlowCount());
             aggregateFlowStatisticsBuilder.setPacketCount(notification.getPacketCount());
             aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
             aggregateFlowStatisticsBuilder.setFlowCount(notification.getFlowCount());
             aggregateFlowStatisticsBuilder.setPacketCount(notification.getPacketCount());
             aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
-
-            ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
-            if(!cache.containsKey(notification.getId())){
-                cache.put(notification.getId(), new NodeStatistics());
-            }
-            cache.get(notification.getId()).getTableAndAggregateFlowStatsMap().put(tableId,aggregateFlowStatisticsBuilder.build());
-
+            
             sucLogger.debug("Augment aggregate statistics: {} for table {} on Node {}",aggregateFlowStatisticsBuilder.build().toString(),tableId,key);
 
             TableBuilder tableBuilder = new TableBuilder();
             sucLogger.debug("Augment aggregate statistics: {} for table {} on Node {}",aggregateFlowStatisticsBuilder.build().toString(),tableId,key);
 
             TableBuilder tableBuilder = new TableBuilder();
@@ -559,20 +549,13 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received port stats update : {}",notification.toString());
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received port stats update : {}",notification.toString());
-
-        //Add statistics to local cache
-        ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
-        if(!cache.containsKey(notification.getId())){
-            cache.put(notification.getId(), new NodeStatistics());
-        }
-
-
+        
         List<NodeConnectorStatisticsAndPortNumberMap> portsStats = notification.getNodeConnectorStatisticsAndPortNumberMap();
         for(NodeConnectorStatisticsAndPortNumberMap portStats : portsStats){
         List<NodeConnectorStatisticsAndPortNumberMap> portsStats = notification.getNodeConnectorStatisticsAndPortNumberMap();
         for(NodeConnectorStatisticsAndPortNumberMap portStats : portsStats){
-
+            
             DataModificationTransaction it = this.statisticsManager.startChange();
 
             DataModificationTransaction it = this.statisticsManager.startChange();
 
-            FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
+            FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder 
                                             = new FlowCapableNodeConnectorStatisticsBuilder();
             statisticsBuilder.setBytes(portStats.getBytes());
             statisticsBuilder.setCollisionCount(portStats.getCollisionCount());
                                             = new FlowCapableNodeConnectorStatisticsBuilder();
             statisticsBuilder.setBytes(portStats.getBytes());
             statisticsBuilder.setCollisionCount(portStats.getCollisionCount());
@@ -585,20 +568,17 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
             statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError());
             statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops());
             statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors());
             statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError());
             statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops());
             statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors());
-
-            //Update data in the cache
-            cache.get(notification.getId()).getNodeConnectorStats().put(portStats.getNodeConnectorId(), statisticsBuilder.build());
-
+            
             //Augment data to the node-connector
             //Augment data to the node-connector
-            FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
+            FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder = 
                     new FlowCapableNodeConnectorStatisticsDataBuilder();
                     new FlowCapableNodeConnectorStatisticsDataBuilder();
-
+            
             statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
             statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
-
+            
             InstanceIdentifier<NodeConnector> nodeConnectorRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance();
             InstanceIdentifier<NodeConnector> nodeConnectorRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance();
-
+            
             NodeConnector nodeConnector = (NodeConnector)it.readOperationalData(nodeConnectorRef);
             NodeConnector nodeConnector = (NodeConnector)it.readOperationalData(nodeConnectorRef);
-
+            
             if(nodeConnector != null){
                 sucLogger.debug("Augmenting port statistics {} to port {}",statisticsDataBuilder.build().toString(),nodeConnectorRef.toString());
                 NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
             if(nodeConnector != null){
                 sucLogger.debug("Augmenting port statistics {} to port {}",statisticsDataBuilder.build().toString(),nodeConnectorRef.toString());
                 NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
@@ -617,32 +597,26 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received flow table statistics update : {}",notification.toString());
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received flow table statistics update : {}",notification.toString());
-
+        
         List<FlowTableAndStatisticsMap> flowTablesStatsList = notification.getFlowTableAndStatisticsMap();
         for (FlowTableAndStatisticsMap ftStats : flowTablesStatsList){
         List<FlowTableAndStatisticsMap> flowTablesStatsList = notification.getFlowTableAndStatisticsMap();
         for (FlowTableAndStatisticsMap ftStats : flowTablesStatsList){
-
+            
             DataModificationTransaction it = this.statisticsManager.startChange();
 
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance();
             DataModificationTransaction it = this.statisticsManager.startChange();
 
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance();
-
+            
             FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
             FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
-
+            
             FlowTableStatisticsBuilder statisticsBuilder = new FlowTableStatisticsBuilder();
             statisticsBuilder.setActiveFlows(ftStats.getActiveFlows());
             statisticsBuilder.setPacketsLookedUp(ftStats.getPacketsLookedUp());
             statisticsBuilder.setPacketsMatched(ftStats.getPacketsMatched());
             FlowTableStatisticsBuilder statisticsBuilder = new FlowTableStatisticsBuilder();
             statisticsBuilder.setActiveFlows(ftStats.getActiveFlows());
             statisticsBuilder.setPacketsLookedUp(ftStats.getPacketsLookedUp());
             statisticsBuilder.setPacketsMatched(ftStats.getPacketsMatched());
-
+            
             statisticsDataBuilder.setFlowTableStatistics(statisticsBuilder.build());
             statisticsDataBuilder.setFlowTableStatistics(statisticsBuilder.build());
-
-            ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
-            if(!cache.containsKey(notification.getId())){
-                cache.put(notification.getId(), new NodeStatistics());
-            }
-            cache.get(notification.getId()).getFlowTableAndStatisticsMap().put(ftStats.getTableId().getValue(),statisticsBuilder.build());
-
+            
             sucLogger.debug("Augment flow table statistics: {} for table {} on Node {}",statisticsBuilder.build().toString(),ftStats.getTableId(),key);
             sucLogger.debug("Augment flow table statistics: {} for table {} on Node {}",statisticsBuilder.build().toString(),ftStats.getTableId(),key);
-
+            
             TableBuilder tableBuilder = new TableBuilder();
             tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue()));
             tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
             TableBuilder tableBuilder = new TableBuilder();
             tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue()));
             tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
@@ -653,70 +627,66 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
 
     @Override
     public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
 
     @Override
     public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
-
+        
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received queue stats update : {}",notification.toString());
         //Check if response is for the request statistics-manager sent.
         if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
             return;
 
         NodeKey key = new NodeKey(notification.getId());
         sucLogger.debug("Received queue stats update : {}",notification.toString());
-
+        
         //Add statistics to local cache
         //Add statistics to local cache
-        ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+        ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
         if(!cache.containsKey(notification.getId())){
         if(!cache.containsKey(notification.getId())){
-            cache.put(notification.getId(), new NodeStatistics());
+            cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
         }
         }
-
+        
+        NodeStatisticsAger nsa = cache.get(notification.getId());
+        
         List<QueueIdAndStatisticsMap> queuesStats = notification.getQueueIdAndStatisticsMap();
         for(QueueIdAndStatisticsMap swQueueStats : queuesStats){
         List<QueueIdAndStatisticsMap> queuesStats = notification.getQueueIdAndStatisticsMap();
         for(QueueIdAndStatisticsMap swQueueStats : queuesStats){
-
-            if(!cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap().containsKey(swQueueStats.getNodeConnectorId())){
-                cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap().put(swQueueStats.getNodeConnectorId(), new HashMap<QueueId,GenericQueueStatistics>());
-            }
-
+            
+            QueueEntry queueEntry = nsa.new QueueEntry(swQueueStats.getNodeConnectorId(),swQueueStats.getQueueId());
+            nsa.updateQueueStats(queueEntry);
+            
             FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
             FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
-
+            
             FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
             FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
-
+            
             queueStatisticsBuilder.fieldsFrom(swQueueStats);
             queueStatisticsBuilder.fieldsFrom(swQueueStats);
-
+            
             queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
             queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
-
-            cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap()
-                                            .get(swQueueStats.getNodeConnectorId())
-                                            .put(swQueueStats.getQueueId(), queueStatisticsBuilder.build());
-
-
+            
             DataModificationTransaction it = this.statisticsManager.startChange();
 
             DataModificationTransaction it = this.statisticsManager.startChange();
 
-            InstanceIdentifier<Queue> queueRef
+            InstanceIdentifier<Queue> queueRef 
                     = InstanceIdentifier.builder(Nodes.class)
                                         .child(Node.class, key)
                                         .child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId()))
                                         .augmentation(FlowCapableNodeConnector.class)
                                         .child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance();
                     = InstanceIdentifier.builder(Nodes.class)
                                         .child(Node.class, key)
                                         .child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId()))
                                         .augmentation(FlowCapableNodeConnector.class)
                                         .child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance();
-
+            
             QueueBuilder queueBuilder = new QueueBuilder();
             queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, queueStatisticsDataBuilder.build());
             queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
 
             QueueBuilder queueBuilder = new QueueBuilder();
             queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, queueStatisticsDataBuilder.build());
             queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
 
-            sucLogger.trace("Augmenting queue statistics {} of queue {} to port {}"
+            sucLogger.info("Augmenting queue statistics {} of queue {} to port {}"
                                         ,queueStatisticsDataBuilder.build().toString(),
                                         swQueueStats.getQueueId(),
                                         swQueueStats.getNodeConnectorId());
                                         ,queueStatisticsDataBuilder.build().toString(),
                                         swQueueStats.getQueueId(),
                                         swQueueStats.getNodeConnectorId());
-
+            
             it.putOperationalData(queueRef, queueBuilder.build());
             it.commit();
             it.putOperationalData(queueRef, queueBuilder.build());
             it.commit();
-
+            
         }
         }
-
+        
     }
 
     private NodeRef getNodeRef(NodeKey nodeKey){
         InstanceIdentifierBuilder<?> builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey);
         return new NodeRef(builder.toInstance());
     }
     }
 
     private NodeRef getNodeRef(NodeKey nodeKey){
         InstanceIdentifierBuilder<?> builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey);
         return new NodeRef(builder.toInstance());
     }
-
+    
     public boolean flowEquals(Flow statsFlow, Flow storedFlow) {
         if (statsFlow.getClass() != storedFlow.getClass()) {
             return false;
     public boolean flowEquals(Flow statsFlow, Flow storedFlow) {
         if (statsFlow.getClass() != storedFlow.getClass()) {
             return false;
@@ -780,28 +750,28 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
         }
         return true;
     }
         }
         return true;
     }
-
+    
     /**
      * Explicit equals method to compare the 'match' for flows stored in the data-stores and flow fetched from the switch.
     /**
      * Explicit equals method to compare the 'match' for flows stored in the data-stores and flow fetched from the switch.
-     * Usecase: e.g If user don't set any ethernet source and destination address for match,data store will store null for
+     * Usecase: e.g If user don't set any ethernet source and destination address for match,data store will store null for 
      * these address.
      * e.g [_ethernetMatch=EthernetMatch [_ethernetDestination=null, _ethernetSource=null, _ethernetType=
      * EthernetType [_type=EtherType [_value=2048], _mask=null, augmentation=[]]
      * these address.
      * e.g [_ethernetMatch=EthernetMatch [_ethernetDestination=null, _ethernetSource=null, _ethernetType=
      * EthernetType [_type=EtherType [_value=2048], _mask=null, augmentation=[]]
-     *
-     * But when you fetch the flows from switch, openflow driver library converts all zero bytes of mac address in the
-     * message stream to 00:00:00:00:00:00. Following string shows how library interpret the zero mac address bytes and
-     * eventually when translator convert it to MD-SAL match, this is how it looks
-     * [_ethernetDestination=EthernetDestination [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]],
-     * _ethernetSource=EthernetSource [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]],
+     * 
+     * But when you fetch the flows from switch, openflow driver library converts all zero bytes of mac address in the 
+     * message stream to 00:00:00:00:00:00. Following string shows how library interpret the zero mac address bytes and 
+     * eventually when translator convert it to MD-SAL match, this is how it looks 
+     * [_ethernetDestination=EthernetDestination [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]], 
+     * _ethernetSource=EthernetSource [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]], 
      * _ethernetType=EthernetType [_type=EtherType [_value=2048], _mask=null, augmentation=[]]
      * _ethernetType=EthernetType [_type=EtherType [_value=2048], _mask=null, augmentation=[]]
-     *
-     * Similarly for inPort, if user/application don't set any value for it, FRM will store null value for it in data store.
+     * 
+     * Similarly for inPort, if user/application don't set any value for it, FRM will store null value for it in data store. 
      * When we fetch the same flow (with its statistics) from switch, plugin converts its value to openflow:X:0.
      * When we fetch the same flow (with its statistics) from switch, plugin converts its value to openflow:X:0.
-     *  e.g _inPort=Uri [_value=openflow:1:0]
-     *
+     *  e.g _inPort=Uri [_value=openflow:1:0]  
+     * 
      * So this custom equals method add additional check to take care of these scenario, in case any match element is null in data-store-flow, but not
      * in the flow fetched from switch.
      * So this custom equals method add additional check to take care of these scenario, in case any match element is null in data-store-flow, but not
      * in the flow fetched from switch.
-     *
+     * 
      * @param statsFlow
      * @param storedFlow
      * @return
      * @param statsFlow
      * @param storedFlow
      * @return
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java
new file mode 100644 (file)
index 0000000..f04c29f
--- /dev/null
@@ -0,0 +1,156 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Following are two main responsibilities of the class
+ * 1) Listen for the create changes in config data store for tree nodes (Flow,Group,Meter,Queue) 
+ * and send statistics request to the switch to fetch the statistics
+ * 
+ * 2)Listen for the remove changes in config data store for tree nodes (Flow,Group,Meter,Queue)
+ * and remove the relative statistics data from operational data store.
+ * 
+ * @author avishnoi@in.ibm.com
+ *
+ */
+public class StatisticsUpdateHandler implements DataChangeListener {
+
+    public final static Logger suhLogger = LoggerFactory.getLogger(StatisticsUpdateHandler.class);
+
+    private final StatisticsProvider statisticsManager;
+    
+    public StatisticsUpdateHandler(final StatisticsProvider manager){
+
+        this.statisticsManager = manager;
+    }
+    
+    public StatisticsProvider getStatisticsManager(){
+        return statisticsManager;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+        
+        Map<InstanceIdentifier<?>, DataObject> additions = change.getCreatedConfigurationData();
+        for (InstanceIdentifier<? extends DataObject> dataObjectInstance : additions.keySet()) {
+            DataObject dataObject = additions.get(dataObjectInstance);
+            InstanceIdentifier<Node> nodeII = dataObjectInstance.firstIdentifierOf(Node.class);
+            NodeRef nodeRef = new NodeRef(nodeII);
+            if(dataObject instanceof Flow){
+                Flow flow = (Flow) dataObject;
+                try {
+                    this.statisticsManager.sendFlowStatsFromTableRequest(nodeRef, flow);
+                } catch (InterruptedException | ExecutionException e) {
+                    suhLogger.warn("Following exception occured while sending flow statistics request newly added flow: {}", e);
+                }
+            }
+            if(dataObject instanceof Meter){
+                try {
+                    this.statisticsManager.sendMeterConfigStatisticsRequest(nodeRef);
+                } catch (InterruptedException | ExecutionException e) {
+                    suhLogger.warn("Following exception occured while sending meter statistics request for newly added meter: {}", e);
+                }
+            }
+            if(dataObject instanceof Group){
+                try {
+                    this.statisticsManager.sendGroupDescriptionRequest(nodeRef);
+                } catch (InterruptedException | ExecutionException e) {
+                    suhLogger.warn("Following exception occured while sending group description request for newly added group: {}", e);
+                }
+            }
+            if(dataObject instanceof Queue){
+                Queue queue = (Queue) dataObject;
+                InstanceIdentifier<NodeConnector> nodeConnectorII = dataObjectInstance.firstIdentifierOf(NodeConnector.class);
+                NodeConnectorKey nodeConnectorKey = InstanceIdentifier.keyOf(nodeConnectorII);
+                try {
+                    this.statisticsManager.sendQueueStatsFromGivenNodeConnector(nodeRef, nodeConnectorKey.getId(), queue.getQueueId());
+                } catch (InterruptedException | ExecutionException e) {
+                    suhLogger.warn("Following exception occured while sending queue statistics request for newly added group: {}", e);
+                }
+            }
+        }
+            
+        Set<InstanceIdentifier<? extends DataObject>> removals = change.getRemovedConfigurationData();
+        for (InstanceIdentifier<? extends DataObject> dataObjectInstance : removals) {
+            DataObject dataObject = change.getOriginalConfigurationData().get(dataObjectInstance);
+            
+            if(dataObject instanceof Flow){
+                InstanceIdentifier<Flow> flowII = (InstanceIdentifier<Flow>)dataObjectInstance;
+                InstanceIdentifier<?> flowAugmentation = 
+                        InstanceIdentifier.builder(flowII).augmentation(FlowStatisticsData.class).toInstance();
+                removeAugmentedOperationalData(flowAugmentation);
+            }
+            if(dataObject instanceof Meter){
+                InstanceIdentifier<Meter> meterII = (InstanceIdentifier<Meter>)dataObjectInstance;
+                
+                InstanceIdentifier<?> nodeMeterConfigStatsAugmentation = 
+                        InstanceIdentifier.builder(meterII).augmentation(NodeMeterConfigStats.class).toInstance();
+                removeAugmentedOperationalData(nodeMeterConfigStatsAugmentation);
+
+                InstanceIdentifier<?> nodeMeterStatisticsAugmentation = 
+                        InstanceIdentifier.builder(meterII).augmentation(NodeMeterStatistics.class).toInstance();
+                removeAugmentedOperationalData(nodeMeterStatisticsAugmentation);
+            }
+            
+            if(dataObject instanceof Group){
+                InstanceIdentifier<Group> groupII = (InstanceIdentifier<Group>)dataObjectInstance;
+                
+                InstanceIdentifier<?> nodeGroupDescStatsAugmentation = 
+                        InstanceIdentifier.builder(groupII).augmentation(NodeGroupDescStats.class).toInstance();
+                removeAugmentedOperationalData(nodeGroupDescStatsAugmentation);
+
+                InstanceIdentifier<?> nodeGroupStatisticsAugmentation = 
+                        InstanceIdentifier.builder(groupII).augmentation(NodeGroupStatistics.class).toInstance();
+                removeAugmentedOperationalData(nodeGroupStatisticsAugmentation);
+            }
+            
+            if(dataObject instanceof Queue){
+                InstanceIdentifier<Queue> queueII = (InstanceIdentifier<Queue>)dataObjectInstance;
+                
+                InstanceIdentifier<?> nodeConnectorQueueStatisticsDataAugmentation = 
+                        InstanceIdentifier.builder(queueII).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
+                removeAugmentedOperationalData(nodeConnectorQueueStatisticsDataAugmentation);
+            }
+        }
+    }
+    
+    private void removeAugmentedOperationalData(InstanceIdentifier<? extends DataObject> dataObjectInstance ){
+        if(dataObjectInstance != null){
+            DataModificationTransaction it = this.statisticsManager.startChange();
+            it.removeOperationalData(dataObjectInstance);
+            it.commit();
+        }
+    }
+}