Merge "Bug fixes for netconf southbound plugin."
authorTony Tkacik <ttkacik@cisco.com>
Sat, 15 Feb 2014 03:25:14 +0000 (03:25 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 15 Feb 2014 03:25:14 +0000 (03:25 +0000)
23 files changed:
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/ComponentActivator.xtend
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.xtend
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryNotificationProvider.java [new file with mode: 0644]
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeConnectorDataChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.xtend
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/AbstractStatsTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsEntry.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowTableStatsTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupDescStatsTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupStatsTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterConfigStatsTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterStatsTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeConnectorStatsTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/QueueStatsEntry.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/QueueStatsTracker.java [new file with mode: 0644]
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsListener.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java

index a6fc4b0..a59c2c1 100644 (file)
@@ -251,6 +251,7 @@ package class SalCompatibilityProvider implements BindingAwareProvider {
         topology.dataService = session.getSALService(DataProviderService)
         tpProvider.dataService = session.getSALService(DataProviderService)
 
+        inventory.start();
 
         tpProvider.start();
 
index fac12ee..8a0874e 100644 (file)
@@ -199,9 +199,11 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi
     }
 
     private def Future<RpcResult<TransactionStatus>> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
-        val flowId = getCache().remove(oldFlow);
+        var flowId = getCache().remove(oldFlow);
         if(flowId == null){
-            throw new IllegalArgumentException("oldFlow is unknown");
+            LOG.error("oldFlow not found in cache : " + oldFlow.hashCode);
+            flowId = UUID.randomUUID();
+            getCache().put(oldFlow, flowId);
         }
 
         getCache().put(newFlow, flowId);
@@ -212,7 +214,9 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi
     private def Future<RpcResult<TransactionStatus>> internalRemoveFlowAsync(Node node, Flow adflow, long rid){
         val flowId = getCache().remove(adflow);
         if(flowId == null){
-            throw new IllegalArgumentException("adflow is unknown");
+            //throw new IllegalArgumentException("adflow not found in cache : " + adflow.hashCode);
+            LOG.error("adflow not found in cache : " + adflow.hashCode);
+            return null;
         }
         val flow = adflow.toMDFlow(flowId.toString());
         val modification = this._dataBrokerService.beginTransaction();
@@ -227,6 +231,10 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi
     }
 
     private def toFutureStatus(Future<RpcResult<TransactionStatus>> future){
+        if(future == null){
+            return toStatus(true);
+        }
+
         try {
             val result = future.get();
             return toStatus(result);
index 60e4324..0c211fd 100644 (file)
@@ -11,6 +11,9 @@ import java.util.ArrayList
 import java.util.Collections
 import java.util.List
 import java.util.Set
+import java.util.ArrayList;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
@@ -76,17 +79,18 @@ import static extension org.opendaylight.controller.sal.compatibility.NodeMappin
 import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader
 import java.util.concurrent.ConcurrentHashMap
 import java.util.Map
+import java.util.HashMap
 
 class InventoryAndReadAdapter implements IPluginInReadService,
-                                                                                        IPluginInInventoryService,
-                                                                                        OpendaylightInventoryListener,
-                                                                                        OpendaylightFlowStatisticsListener,
-                                                                                        OpendaylightFlowTableStatisticsListener,
-                                                                                        OpendaylightPortStatisticsListener {
+                                             IPluginInInventoryService,
+                                             OpendaylightInventoryListener,
+                                             OpendaylightFlowStatisticsListener,
+                                             OpendaylightFlowTableStatisticsListener,
+                                             OpendaylightPortStatisticsListener {
 
     private static val LOG = LoggerFactory.getLogger(InventoryAndReadAdapter);
 
-       private static val OPENFLOWV10_TABLE_ID = new Integer(0).shortValue;
+    private static val OPENFLOWV10_TABLE_ID = new Integer(0).shortValue;
     @Property
     DataBrokerService dataService;
 
@@ -111,21 +115,34 @@ class InventoryAndReadAdapter implements IPluginInReadService,
     @Property
     List<IPluginOutInventoryService> inventoryPublisher = new CopyOnWriteArrayList<IPluginOutInventoryService>();
 
-       def setInventoryPublisher(IPluginOutInventoryService listener){
+    private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider();
+
+    private final Map<InstanceIdentifier.PathArgument, List<InstanceIdentifier.PathArgument>> nodeToNodeConnectorsMap = new ConcurrentHashMap<InstanceIdentifier.PathArgument, List<InstanceIdentifier.PathArgument>>();
+
+    private final Lock nodeToNodeConnectorsLock = new ReentrantLock();
+
+
+    def start(){
+        inventoryNotificationProvider.dataProviderService = dataProviderService;
+        inventoryNotificationProvider.inventoryPublisher = inventoryPublisher;
+        // inventoryNotificationProvider.start();
+    }
+
+    def setInventoryPublisher(IPluginOutInventoryService listener){
         inventoryPublisher.add(listener);
-       }
+    }
 
-       def unsetInventoryPublisher(IPluginOutInventoryService listener){
+    def unsetInventoryPublisher(IPluginOutInventoryService listener){
         inventoryPublisher.remove(listener);
-       }
+    }
 
     def setReadPublisher(IPluginOutReadService listener) {
-       statisticsPublisher.add(listener);
+        statisticsPublisher.add(listener);
     }
     
     def unsetReadPublisher (IPluginOutReadService listener) {
-       if( listener != null)
-               statisticsPublisher.remove(listener);
+        if( listener != null)
+            statisticsPublisher.remove(listener);
     }
 
     protected def startChange() {
@@ -140,33 +157,33 @@ class InventoryAndReadAdapter implements IPluginInReadService,
     override readAllFlow(Node node, boolean cached) {
 
         val output = new ArrayList<FlowOnNode>();
-               val tableRef = InstanceIdentifier.builder(Nodes)
-                                                                               .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(node))
-                                                       .augmentation(FlowCapableNode).child(Table, new TableKey(OPENFLOWV10_TABLE_ID)).toInstance();
-               
-               val it = this.startChange();
-               
-               val table= it.readConfigurationData(tableRef) as Table;
-               
-               if(table != null){
-                       LOG.trace("Number of flows installed in table 0 of node {} : {}",node,table.flow.size);
-                       
-                       for(flow : table.flow){
-                               
-                               val adsalFlow = ToSalConversionsUtils.toFlow(flow,node);
-                               val statsFromDataStore = flow.getAugmentation(FlowStatisticsData);
-                               
-                               if(statsFromDataStore != null){
-                                       val it = new FlowOnNode(adsalFlow);
-                                       byteCount =  statsFromDataStore.flowStatistics.byteCount.value.longValue;
-                                       packetCount = statsFromDataStore.flowStatistics.packetCount.value.longValue;
-                                       durationSeconds = statsFromDataStore.flowStatistics.duration.second.value.intValue;
-                                       durationNanoseconds = statsFromDataStore.flowStatistics.duration.nanosecond.value.intValue;
-                                       
-                                       output.add(it);
-                               }
-                       }
-               }
+        val tableRef = InstanceIdentifier.builder(Nodes)
+                                        .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(node))
+                                        .augmentation(FlowCapableNode).child(Table, new TableKey(OPENFLOWV10_TABLE_ID)).toInstance();
+        
+        val it = this.startChange();
+        
+        val table= it.readConfigurationData(tableRef) as Table;
+        
+        if(table != null){
+            LOG.trace("Number of flows installed in table 0 of node {} : {}",node,table.flow.size);
+            
+            for(flow : table.flow){
+                
+                val adsalFlow = ToSalConversionsUtils.toFlow(flow,node);
+                val statsFromDataStore = flow.getAugmentation(FlowStatisticsData);
+                
+                if(statsFromDataStore != null){
+                    val it = new FlowOnNode(adsalFlow);
+                    byteCount =  statsFromDataStore.flowStatistics.byteCount.value.longValue;
+                    packetCount = statsFromDataStore.flowStatistics.packetCount.value.longValue;
+                    durationSeconds = statsFromDataStore.flowStatistics.duration.second.value.intValue;
+                    durationNanoseconds = statsFromDataStore.flowStatistics.duration.nanosecond.value.intValue;
+                    
+                    output.add(it);
+                }
+            }
+        }
         
         //TODO (main): Shell we send request to the switch? It will make async request to the switch.
         // Once plugin receive response, it will let adaptor know through onFlowStatisticsUpdate()
@@ -180,35 +197,35 @@ class InventoryAndReadAdapter implements IPluginInReadService,
     }
 
     override readAllNodeConnector(Node node, boolean cached) {
-       
-       val ret = new ArrayList<NodeConnectorStatistics>();
-               val nodeRef = InstanceIdentifier.builder(Nodes)
-                                                                       .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(node))
-                                                                       .toInstance();
-               
-               val provider = this.startChange();
-               
-               val dsNode= provider.readConfigurationData(nodeRef) as org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-               
-               if(dsNode != null){
-                       
-                       for (dsNodeConnector : dsNode.nodeConnector){
-                               val nodeConnectorRef = InstanceIdentifier.builder(Nodes)
-                                                                       .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(node))
-                                                                       .child(NodeConnector, dsNodeConnector.key)
-                                                                       .toInstance();
-                               
-                               val nodeConnectorFromDS = provider.readConfigurationData(nodeConnectorRef) as NodeConnector;
-                               
-                               if(nodeConnectorFromDS != null){
-                                       val nodeConnectorStatsFromDs = nodeConnectorFromDS.getAugmentation(FlowCapableNodeConnectorStatisticsData) as FlowCapableNodeConnectorStatistics;
-                                       
-                                       ret.add(toNodeConnectorStatistics(nodeConnectorStatsFromDs.flowCapableNodeConnectorStatistics,dsNode.id,dsNodeConnector.id));
-                               }
-                       }
-               }
-
-               //TODO: Refer TODO (main)
+        
+        val ret = new ArrayList<NodeConnectorStatistics>();
+        val nodeRef = InstanceIdentifier.builder(Nodes)
+                                    .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(node))
+                                    .toInstance();
+        
+        val provider = this.startChange();
+        
+        val dsNode= provider.readConfigurationData(nodeRef) as org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+        
+         if(dsNode != null){
+             
+             for (dsNodeConnector : dsNode.nodeConnector){
+                val nodeConnectorRef = InstanceIdentifier.builder(Nodes)
+                                    .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(node))
+                                    .child(NodeConnector, dsNodeConnector.key)
+                                    .toInstance();
+                 
+                 val nodeConnectorFromDS = provider.readConfigurationData(nodeConnectorRef) as NodeConnector;
+                 
+                 if(nodeConnectorFromDS != null){
+                     val nodeConnectorStatsFromDs = nodeConnectorFromDS.getAugmentation(FlowCapableNodeConnectorStatisticsData) as FlowCapableNodeConnectorStatistics;
+                     
+                    ret.add(toNodeConnectorStatistics(nodeConnectorStatsFromDs.flowCapableNodeConnectorStatistics,dsNode.id,dsNodeConnector.id));
+                 }
+             }
+         }
+
+        //TODO: Refer TODO (main)
         val input = new GetAllNodeConnectorsStatisticsInputBuilder();
         input.setNode(node.toNodeRef);
         nodeConnectorStatisticsService.getAllNodeConnectorsStatistics(input.build());
@@ -216,23 +233,23 @@ class InventoryAndReadAdapter implements IPluginInReadService,
     }
 
     override readAllNodeTable(Node node, boolean cached) {
-       val ret = new ArrayList<NodeTableStatistics>();
-       
-               val dsFlowCapableNode= readFlowCapableNode(node.toNodeRef)
-               
-               if(dsFlowCapableNode != null){
-                       
-                       for (table : dsFlowCapableNode.table){
-                               
-                               val tableStats = table.getAugmentation(FlowTableStatisticsData);
-                               
-                               if(tableStats != null){
-                                       ret.add(toNodeTableStatistics(tableStats.flowTableStatistics,table.id,node));
-                               }
-                       }
-               }
-
-               //TODO: Refer TODO (main)
+        val ret = new ArrayList<NodeTableStatistics>();
+        
+        val dsFlowCapableNode= readFlowCapableNode(node.toNodeRef)
+        
+         if(dsFlowCapableNode != null){
+             
+             for (table : dsFlowCapableNode.table){
+                 
+                 val tableStats = table.getAugmentation(FlowTableStatisticsData);
+                 
+                 if(tableStats != null){
+                     ret.add(toNodeTableStatistics(tableStats.flowTableStatistics,table.id,node));
+                 }
+             }
+         }
+
+        //TODO: Refer TODO (main)
         val input = new GetFlowTablesStatisticsInputBuilder();
         input.setNode(node.toNodeRef);
         flowTableStatisticsService.getFlowTablesStatistics(input.build);
@@ -241,39 +258,39 @@ class InventoryAndReadAdapter implements IPluginInReadService,
 
     override readDescription(Node node, boolean cached) {
         return toNodeDescription(node.toNodeRef);
-       }
+    }
 
     override readFlow(Node node, Flow targetFlow, boolean cached) {
-               var FlowOnNode ret= null;
-               
-               val tableRef = InstanceIdentifier.builder(Nodes)
-                                                                               .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(node))
-                                                       .augmentation(FlowCapableNode).child(Table, new TableKey(OPENFLOWV10_TABLE_ID)).toInstance();
-               
-               val it = this.startChange();
-               
-               val table= it.readConfigurationData(tableRef) as Table;
-               
-               if(table != null){
-                       LOG.trace("Number of flows installed in table 0 of node {} : {}",node,table.flow.size);
-                       
-                       for(mdsalFlow : table.flow){
-                               if(FromSalConversionsUtils.flowEquals(mdsalFlow, MDFlowMapping.toMDSalflow(targetFlow))){
-                                       val statsFromDataStore = mdsalFlow.getAugmentation(FlowStatisticsData);
-                                       
-                                       if(statsFromDataStore != null){
-                                               LOG.debug("Found matching flow in the data store flow table ");
-                                               val it = new FlowOnNode(targetFlow);
-                                               byteCount =  statsFromDataStore.flowStatistics.byteCount.value.longValue;
-                                               packetCount = statsFromDataStore.flowStatistics.packetCount.value.longValue;
-                                               durationSeconds = statsFromDataStore.flowStatistics.duration.second.value.intValue;
-                                               durationNanoseconds = statsFromDataStore.flowStatistics.duration.nanosecond.value.intValue;
-                                               
-                                               ret = it;
-                                       }
-                               }                       
-                       }
-               }
+        var FlowOnNode ret= null;
+        
+        val tableRef = InstanceIdentifier.builder(Nodes)
+                                        .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(node))
+                                        .augmentation(FlowCapableNode).child(Table, new TableKey(OPENFLOWV10_TABLE_ID)).toInstance();
+        
+        val it = this.startChange();
+        
+        val table= it.readConfigurationData(tableRef) as Table;
+        
+        if(table != null){
+            LOG.trace("Number of flows installed in table 0 of node {} : {}",node,table.flow.size);
+            
+            for(mdsalFlow : table.flow){
+                if(FromSalConversionsUtils.flowEquals(mdsalFlow, MDFlowMapping.toMDSalflow(targetFlow))){
+                    val statsFromDataStore = mdsalFlow.getAugmentation(FlowStatisticsData);
+                    
+                    if(statsFromDataStore != null){
+                        LOG.debug("Found matching flow in the data store flow table ");
+                        val it = new FlowOnNode(targetFlow);
+                        byteCount =  statsFromDataStore.flowStatistics.byteCount.value.longValue;
+                        packetCount = statsFromDataStore.flowStatistics.packetCount.value.longValue;
+                        durationSeconds = statsFromDataStore.flowStatistics.duration.second.value.intValue;
+                        durationNanoseconds = statsFromDataStore.flowStatistics.duration.nanosecond.value.intValue;
+                        
+                        ret = it;
+                    }
+                }            
+            }
+        }
         
         //TODO: Refer TODO (main)
         val input = new GetFlowStatisticsFromFlowTableInputBuilder;
@@ -282,30 +299,30 @@ class InventoryAndReadAdapter implements IPluginInReadService,
         flowStatisticsService.getFlowStatisticsFromFlowTable(input.build)
         
         return ret;
-       
+        
     }
 
     override readNodeConnector(org.opendaylight.controller.sal.core.NodeConnector connector, boolean cached) {
-       var NodeConnectorStatistics  nodeConnectorStatistics = null;
-       
-               val nodeConnectorRef = InstanceIdentifier.builder(Nodes)
-                                                                       .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(connector.node))
-                                                                       .child(NodeConnector, InventoryMapping.toNodeConnectorKey(connector))
-                                                                       .toInstance();
-               val provider = this.startChange();
-                               
-               val nodeConnectorFromDS = provider.readConfigurationData(nodeConnectorRef) as NodeConnector;
-                               
-               if(nodeConnectorFromDS != null){
-                       val nodeConnectorStatsFromDs = nodeConnectorFromDS.getAugmentation(FlowCapableNodeConnectorStatisticsData) as FlowCapableNodeConnectorStatistics;
-                       if(nodeConnectorStatsFromDs != null) {
-                               nodeConnectorStatistics = toNodeConnectorStatistics(nodeConnectorStatsFromDs.flowCapableNodeConnectorStatistics,
-                                                                                                                                               InventoryMapping.toNodeKey(connector.node).id,
-                                                                                                                                               InventoryMapping.toNodeConnectorKey(connector).id);
-                       }
-               }
-
-               //TODO: Refer TODO (main)
+        var NodeConnectorStatistics  nodeConnectorStatistics = null;
+    
+        val nodeConnectorRef = InstanceIdentifier.builder(Nodes)
+                                    .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(connector.node))
+                                    .child(NodeConnector, InventoryMapping.toNodeConnectorKey(connector))
+                                    .toInstance();
+         val provider = this.startChange();
+                 
+         val nodeConnectorFromDS = provider.readConfigurationData(nodeConnectorRef) as NodeConnector;
+                 
+         if(nodeConnectorFromDS != null){
+            val nodeConnectorStatsFromDs = nodeConnectorFromDS.getAugmentation(FlowCapableNodeConnectorStatisticsData) as FlowCapableNodeConnectorStatistics;
+            if(nodeConnectorStatsFromDs != null) {
+                nodeConnectorStatistics = toNodeConnectorStatistics(nodeConnectorStatsFromDs.flowCapableNodeConnectorStatistics,
+                                                                        InventoryMapping.toNodeKey(connector.node).id,
+                                                                        InventoryMapping.toNodeConnectorKey(connector).id);
+            }
+        }
+
+        //TODO: Refer TODO (main)
         val input = new GetNodeConnectorStatisticsInputBuilder();
         input.setNode(connector.node.toNodeRef);
         input.setNodeConnectorId(InventoryMapping.toNodeConnectorKey(connector).id);
@@ -314,25 +331,25 @@ class InventoryAndReadAdapter implements IPluginInReadService,
     }
 
     override readNodeTable(NodeTable nodeTable, boolean cached) {
-       var NodeTableStatistics nodeStats = null
-       
-       val tableRef = InstanceIdentifier.builder(Nodes)
-                                                                               .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(nodeTable.node))
-                                                       .augmentation(FlowCapableNode).child(Table, new TableKey(nodeTable.ID as Short)).toInstance();
-               
-               val it = this.startChange();
-               
-               val table= it.readConfigurationData(tableRef) as Table;
-               
-               if(table != null){
-                       val tableStats = table.getAugmentation(FlowTableStatisticsData);
-                               
-                       if(tableStats != null){
-                               nodeStats =  toNodeTableStatistics(tableStats.flowTableStatistics,table.id,nodeTable.node);
-                       }
-               }
-
-               //TODO: Refer TODO (main)
+        var NodeTableStatistics nodeStats = null
+        
+        val tableRef = InstanceIdentifier.builder(Nodes)
+                                        .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, InventoryMapping.toNodeKey(nodeTable.node))
+                                        .augmentation(FlowCapableNode).child(Table, new TableKey(nodeTable.ID as Short)).toInstance();
+        
+        val it = this.startChange();
+        
+        val table= it.readConfigurationData(tableRef) as Table;
+        
+        if(table != null){
+            val tableStats = table.getAugmentation(FlowTableStatisticsData);
+                 
+             if(tableStats != null){
+                 nodeStats =  toNodeTableStatistics(tableStats.flowTableStatistics,table.id,nodeTable.node);
+            }
+        }
+
+        //TODO: Refer TODO (main)
         val input = new GetFlowTablesStatisticsInputBuilder();
         input.setNode(nodeTable.node.toNodeRef);
         flowTableStatisticsService.getFlowTablesStatistics(input.build);
@@ -341,19 +358,22 @@ class InventoryAndReadAdapter implements IPluginInReadService,
     }
 
     override onNodeConnectorRemoved(NodeConnectorRemoved update) {
-        // NOOP
+        // Never received
     }
 
     override onNodeRemoved(NodeRemoved notification) {
         val properties = Collections.<org.opendaylight.controller.sal.core.Property>emptySet();
 
+        removeNodeConnectors(notification.nodeRef.value);
+
         publishNodeUpdate(notification.nodeRef.toADNode, UpdateType.REMOVED, properties);
     }
 
     override onNodeConnectorUpdated(NodeConnectorUpdated update) {
         var updateType = UpdateType.CHANGED;
-        if ( this._dataService.readOperationalData(update.nodeConnectorRef.value as InstanceIdentifier<? extends DataObject>) == null ){
+        if(!isKnownNodeConnector(update.nodeConnectorRef.value)){
             updateType = UpdateType.ADDED;
+            recordNodeConnector(update.nodeConnectorRef.value);
         }
 
         var nodeConnector = update.nodeConnectorRef.toADNodeConnector
@@ -369,16 +389,16 @@ class InventoryAndReadAdapter implements IPluginInReadService,
             updateType = UpdateType.ADDED;
         }
         publishNodeUpdate(notification.nodeRef.toADNode, updateType, notification.toADNodeProperties);
-        
-               //Notify the listeners of IPluginOutReadService
-        
+
+        //Notify the listeners of IPluginOutReadService
+
         for (statsPublisher : statisticsPublisher){
-                       val nodeRef = InstanceIdentifier.builder(Nodes).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(notification.id)).toInstance;
+            val nodeRef = InstanceIdentifier.builder(Nodes).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(notification.id)).toInstance;
             val description = notification.nodeRef.toNodeDescription
             if(description != null) {
-                         statsPublisher.descriptionStatisticsUpdated(nodeRef.toADNode,description);
-                       }
-               }
+              statsPublisher.descriptionStatisticsUpdated(nodeRef.toADNode,description);
+            }
+        }
     }
 
     override getNodeProps() {
@@ -461,50 +481,50 @@ class InventoryAndReadAdapter implements IPluginInReadService,
 
     private def toNodeConnectorStatistics(
         org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.NodeConnectorStatistics nodeConnectorStatistics, NodeId nodeId, NodeConnectorId nodeConnectorId) {
-               
-                       val it = new NodeConnectorStatistics();
-                       
-                       receivePacketCount = nodeConnectorStatistics.packets.received.longValue;
-                       transmitPacketCount = nodeConnectorStatistics.packets.transmitted.longValue;
-                       
-                       receiveByteCount = nodeConnectorStatistics.bytes.received.longValue;
-                       transmitByteCount = nodeConnectorStatistics.bytes.transmitted.longValue;
-                       
-                       receiveDropCount = nodeConnectorStatistics.receiveDrops.longValue;
-                       transmitDropCount = nodeConnectorStatistics.transmitDrops.longValue;
-                       
-                       receiveErrorCount = nodeConnectorStatistics.receiveErrors.longValue;
-                       transmitErrorCount = nodeConnectorStatistics.transmitErrors.longValue;
-                       
-                       receiveFrameErrorCount = nodeConnectorStatistics.receiveFrameError.longValue;
-                       receiveOverRunErrorCount = nodeConnectorStatistics.receiveOverRunError.longValue;
-                       receiveCRCErrorCount = nodeConnectorStatistics.receiveCrcError.longValue;
-                       collisionCount = nodeConnectorStatistics.collisionCount.longValue;
-                       
-                       val nodeConnectorRef = InstanceIdentifier.builder(Nodes)
-                                                               .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(nodeId))
-                                                               .child(NodeConnector,new NodeConnectorKey(nodeConnectorId)).toInstance;
-                       
-                       nodeConnector = NodeMapping.toADNodeConnector(new NodeConnectorRef(nodeConnectorRef));
-                       
-                       return it;
-    }
-
-       private def toNodeTableStatistics(
-               FlowTableStatistics tableStats,
-               Short tableId,Node node){
-               var it = new NodeTableStatistics();
-               
-               activeCount = tableStats.activeFlows.value.intValue;
-               lookupCount = tableStats.packetsLookedUp.value.intValue;
-               matchedCount = tableStats.packetsMatched.value.intValue;
-               name = tableId.toString;
-               nodeTable = new NodeTable(NodeMapping.MD_SAL_TYPE,tableId,node);
-               return it;
-       }
-       
-       private def toNodeDescription(NodeRef nodeRef){
-               val capableNode = readFlowCapableNode(nodeRef);
+            
+            val it = new NodeConnectorStatistics();
+            
+            receivePacketCount = nodeConnectorStatistics.packets.received.longValue;
+            transmitPacketCount = nodeConnectorStatistics.packets.transmitted.longValue;
+            
+            receiveByteCount = nodeConnectorStatistics.bytes.received.longValue;
+            transmitByteCount = nodeConnectorStatistics.bytes.transmitted.longValue;
+            
+            receiveDropCount = nodeConnectorStatistics.receiveDrops.longValue;
+            transmitDropCount = nodeConnectorStatistics.transmitDrops.longValue;
+            
+            receiveErrorCount = nodeConnectorStatistics.receiveErrors.longValue;
+            transmitErrorCount = nodeConnectorStatistics.transmitErrors.longValue;
+            
+            receiveFrameErrorCount = nodeConnectorStatistics.receiveFrameError.longValue;
+            receiveOverRunErrorCount = nodeConnectorStatistics.receiveOverRunError.longValue;
+            receiveCRCErrorCount = nodeConnectorStatistics.receiveCrcError.longValue;
+            collisionCount = nodeConnectorStatistics.collisionCount.longValue;
+            
+            val nodeConnectorRef = InstanceIdentifier.builder(Nodes)
+                                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(nodeId))
+                                .child(NodeConnector,new NodeConnectorKey(nodeConnectorId)).toInstance;
+            
+            nodeConnector = NodeMapping.toADNodeConnector(new NodeConnectorRef(nodeConnectorRef));
+            
+            return it;
+    }
+
+    private def toNodeTableStatistics(
+        FlowTableStatistics tableStats,
+        Short tableId,Node node){
+        var it = new NodeTableStatistics();
+        
+        activeCount = tableStats.activeFlows.value.intValue;
+        lookupCount = tableStats.packetsLookedUp.value.intValue;
+        matchedCount = tableStats.packetsMatched.value.intValue;
+        name = tableId.toString;
+        nodeTable = new NodeTable(NodeMapping.MD_SAL_TYPE,tableId,node);
+        return it;
+    }
+    
+    private def toNodeDescription(NodeRef nodeRef){
+        val capableNode = readFlowCapableNode(nodeRef);
         if(capableNode !=null) {
             val it = new NodeDescription()
             manufacturer = capableNode.manufacturer
@@ -515,101 +535,148 @@ class InventoryAndReadAdapter implements IPluginInReadService,
             return it;
          }
          return null;
-       }
+    }
     
     
     def Edge toADEdge(Link link) {
         new Edge(link.source.toADNodeConnector,link.destination.toADNodeConnector)
     }
-       
-       /*
-        * OpendaylightFlowStatisticsListener interface implementation
-        */
-       override onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
+    
+    /*
+     * OpendaylightFlowStatisticsListener interface implementation
+     */
+    override onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
         //Ignoring this notification as there does not seem to be a way to bubble this up to AD-SAL
-       }
-       
-       override onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
-               
-               val adsalFlowsStatistics = new ArrayList<FlowOnNode>();
-               val nodeRef = InstanceIdentifier.builder(Nodes).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(notification.id)).toInstance;
-               
-               for(flowStats : notification.flowAndStatisticsMapList){
-                       if(flowStats.tableId == 0)
-                               adsalFlowsStatistics.add(toFlowOnNode(flowStats,nodeRef.toADNode));
-               }
-               
-               for (statsPublisher : statisticsPublisher){
-                       statsPublisher.nodeFlowStatisticsUpdated(nodeRef.toADNode,adsalFlowsStatistics);
-               }
-               
-       }
-       /*
-        * OpendaylightFlowTableStatisticsListener interface implementation
-        */     
-       override onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
-               var adsalFlowTableStatistics = new ArrayList<NodeTableStatistics>();
-               
-               for(stats : notification.flowTableAndStatisticsMap){
-                       if (stats.tableId.value == 0){
-                               val it = new NodeTableStatistics();
-                               activeCount = stats.activeFlows.value.intValue;
-                               lookupCount = stats.packetsLookedUp.value.longValue;
-                               matchedCount = stats.packetsMatched.value.longValue;
-                               
-                               adsalFlowTableStatistics.add(it);
-                       }
-               }
-               for (statsPublisher : statisticsPublisher){
-                       val nodeRef = InstanceIdentifier.builder(Nodes).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(notification.id)).toInstance;
-                       statsPublisher.nodeTableStatisticsUpdated(nodeRef.toADNode,adsalFlowTableStatistics);
-               }
-       }
-       
-       /*
-        * OpendaylightPortStatisticsUpdate interface implementation
-        */
-       override onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
-               
-               val adsalPortStatistics  = new ArrayList<NodeConnectorStatistics>();
-               
-               for(nodeConnectorStatistics : notification.nodeConnectorStatisticsAndPortNumberMap){
-                       adsalPortStatistics.add(toNodeConnectorStatistics(nodeConnectorStatistics,notification.id,nodeConnectorStatistics.nodeConnectorId));
-               }
-               
-               for (statsPublisher : statisticsPublisher){
-                       val nodeRef = InstanceIdentifier.builder(Nodes).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(notification.id)).toInstance;
-                       statsPublisher.nodeConnectorStatisticsUpdated(nodeRef.toADNode,adsalPortStatistics);
-               }
-               
-       }
-       
-       private static def toFlowOnNode (FlowAndStatisticsMapList flowAndStatsMap,Node node){
-               
-               val it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatsMap,node));
-               
-               byteCount = flowAndStatsMap.byteCount.value.longValue;
-               packetCount = flowAndStatsMap.packetCount.value.longValue;
-               durationSeconds = flowAndStatsMap.duration.second.value.intValue;
-               durationNanoseconds = flowAndStatsMap.duration.nanosecond.value.intValue;
-               
-               return it;
-       }
-
-       override  getConfiguredNotConnectedNodes() {
+    }
+    
+    override onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
+        
+        val adsalFlowsStatistics = new ArrayList<FlowOnNode>();
+        val nodeRef = InstanceIdentifier.builder(Nodes).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(notification.id)).toInstance;
+        
+        for(flowStats : notification.flowAndStatisticsMapList){
+            if(flowStats.tableId == 0)
+                adsalFlowsStatistics.add(toFlowOnNode(flowStats,nodeRef.toADNode));
+        }
+        
+        for (statsPublisher : statisticsPublisher){
+            statsPublisher.nodeFlowStatisticsUpdated(nodeRef.toADNode,adsalFlowsStatistics);
+        }
+        
+    }
+    /*
+     * OpendaylightFlowTableStatisticsListener interface implementation
+     */    
+    override onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
+        var adsalFlowTableStatistics = new ArrayList<NodeTableStatistics>();
+        
+        for(stats : notification.flowTableAndStatisticsMap){
+            if (stats.tableId.value == 0){
+                val it = new NodeTableStatistics();
+                activeCount = stats.activeFlows.value.intValue;
+                lookupCount = stats.packetsLookedUp.value.longValue;
+                matchedCount = stats.packetsMatched.value.longValue;
+                
+                adsalFlowTableStatistics.add(it);
+            }
+        }
+        for (statsPublisher : statisticsPublisher){
+            val nodeRef = InstanceIdentifier.builder(Nodes).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(notification.id)).toInstance;
+            statsPublisher.nodeTableStatisticsUpdated(nodeRef.toADNode,adsalFlowTableStatistics);
+        }
+    }
+    
+    /*
+     * OpendaylightPortStatisticsUpdate interface implementation
+     */
+    override onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
+        
+        val adsalPortStatistics  = new ArrayList<NodeConnectorStatistics>();
+        
+        for(nodeConnectorStatistics : notification.nodeConnectorStatisticsAndPortNumberMap){
+            adsalPortStatistics.add(toNodeConnectorStatistics(nodeConnectorStatistics,notification.id,nodeConnectorStatistics.nodeConnectorId));
+        }
+        
+        for (statsPublisher : statisticsPublisher){
+            val nodeRef = InstanceIdentifier.builder(Nodes).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node,new NodeKey(notification.id)).toInstance;
+            statsPublisher.nodeConnectorStatisticsUpdated(nodeRef.toADNode,adsalPortStatistics);
+        }
+        
+    }
+    
+    private static def toFlowOnNode (FlowAndStatisticsMapList flowAndStatsMap,Node node){
+        
+        val it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatsMap,node));
+        
+        byteCount = flowAndStatsMap.byteCount.value.longValue;
+        packetCount = flowAndStatsMap.packetCount.value.longValue;
+        durationSeconds = flowAndStatsMap.duration.second.value.intValue;
+        durationNanoseconds = flowAndStatsMap.duration.nanosecond.value.intValue;
+        
+        return it;
+    }
+
+    override  getConfiguredNotConnectedNodes() {
         return Collections.emptySet();
-       }
+    }
+
+
+    private def publishNodeUpdate(Node node, UpdateType updateType, Set<org.opendaylight.controller.sal.core.Property> properties){
+        for( publisher : inventoryPublisher){
+            publisher.updateNode(node, updateType, properties);
+        }
+    }
+
+    private def publishNodeConnectorUpdate(org.opendaylight.controller.sal.core.NodeConnector nodeConnector, UpdateType updateType, Set<org.opendaylight.controller.sal.core.Property> properties){
+        for( publisher : inventoryPublisher){
+            publisher.updateNodeConnector(nodeConnector, updateType, properties);
+        }
+    }
+
+    private def isKnownNodeConnector(InstanceIdentifier<? extends Object> nodeConnectorIdentifier){
+        if(nodeConnectorIdentifier.path.size() < 3) {
+            return false;
+        }
 
+        val nodePath = nodeConnectorIdentifier.path.get(1);
+        val nodeConnectorPath = nodeConnectorIdentifier.getPath().get(2);
 
-       private def publishNodeUpdate(Node node, UpdateType updateType, Set<org.opendaylight.controller.sal.core.Property> properties){
-           for( publisher : inventoryPublisher){
-               publisher.updateNode(node, updateType, properties);
-           }
-       }
+        val nodeConnectors = nodeToNodeConnectorsMap.get(nodePath);
 
-       private def publishNodeConnectorUpdate(org.opendaylight.controller.sal.core.NodeConnector nodeConnector, UpdateType updateType, Set<org.opendaylight.controller.sal.core.Property> properties){
-           for( publisher : inventoryPublisher){
-               publisher.updateNodeConnector(nodeConnector, updateType, properties);
-           }
-       }
+        if(nodeConnectors == null){
+            return false;
+        }
+        return nodeConnectors.contains(nodeConnectorPath);
+    }
+
+
+    private def recordNodeConnector(InstanceIdentifier<? extends Object> nodeConnectorIdentifier){
+        if(nodeConnectorIdentifier.path.size() < 3) {
+            return false;
+        }
+
+        val nodePath = nodeConnectorIdentifier.path.get(1);
+        val nodeConnectorPath = nodeConnectorIdentifier.getPath().get(2);
+
+        nodeToNodeConnectorsLock.lock();
+
+        try {
+            var nodeConnectors = nodeToNodeConnectorsMap.get(nodePath);
+
+            if(nodeConnectors == null){
+                nodeConnectors = new ArrayList<InstanceIdentifier.PathArgument>();
+                nodeToNodeConnectorsMap.put(nodePath, nodeConnectors);
+            }
+
+            nodeConnectors.add(nodeConnectorPath);
+        } finally {
+            nodeToNodeConnectorsLock.unlock();
+        }
+    }
+
+    private def removeNodeConnectors(InstanceIdentifier<? extends Object> nodeIdentifier){
+        val nodePath = nodeIdentifier.path.get(1);
+
+        nodeToNodeConnectorsMap.remove(nodePath);
+    }
 }
diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryNotificationProvider.java b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryNotificationProvider.java
new file mode 100644 (file)
index 0000000..23a98ff
--- /dev/null
@@ -0,0 +1,59 @@
+package org.opendaylight.controller.sal.compatibility;
+
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.sal.inventory.IPluginOutInventoryService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class InventoryNotificationProvider implements AutoCloseable{
+
+    private ListenerRegistration<DataChangeListener> nodeConnectorDataChangeListenerRegistration;
+
+    private NodeConnectorDataChangeListener nodeConnectorDataChangeListener;
+
+    private DataProviderService dataProviderService;
+
+    private List<IPluginOutInventoryService> inventoryPublisher;
+
+    private final static Logger LOG = LoggerFactory.getLogger(NodeConnectorDataChangeListener.class);
+
+    public void start(){
+
+        LOG.info("InventoryNotificationProvider started");
+
+        if(dataProviderService != null
+                && inventoryPublisher!= null){
+
+            if(nodeConnectorDataChangeListener == null){
+                InstanceIdentifier nodeConnectorPath = InstanceIdentifier.builder(Nodes.class).child(Node.class).child(NodeConnector.class).build();
+                nodeConnectorDataChangeListener = new NodeConnectorDataChangeListener();
+                nodeConnectorDataChangeListener.setInventoryPublisher(inventoryPublisher);
+                nodeConnectorDataChangeListenerRegistration = dataProviderService.registerDataChangeListener(nodeConnectorPath, nodeConnectorDataChangeListener);
+            }
+
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if(nodeConnectorDataChangeListenerRegistration != null){
+            nodeConnectorDataChangeListenerRegistration.close();
+        }
+    }
+
+    public void setDataProviderService(DataProviderService dataProviderService) {
+        this.dataProviderService = dataProviderService;
+    }
+
+    public void setInventoryPublisher(List<IPluginOutInventoryService> inventoryPublisher) {
+        this.inventoryPublisher = inventoryPublisher;
+    }
+}
diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeConnectorDataChangeListener.java b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeConnectorDataChangeListener.java
new file mode 100644 (file)
index 0000000..eebba74
--- /dev/null
@@ -0,0 +1,77 @@
+package org.opendaylight.controller.sal.compatibility;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.core.ConstructionException;
+import org.opendaylight.controller.sal.core.NodeConnector;
+import org.opendaylight.controller.sal.core.Property;
+import org.opendaylight.controller.sal.core.UpdateType;
+import org.opendaylight.controller.sal.inventory.IPluginOutInventoryService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// org.opendaylight.controller.sal.compatibility.NodeConnectorDataChangeListener
+public class NodeConnectorDataChangeListener implements DataChangeListener{
+    private final static Logger LOG = LoggerFactory.getLogger(NodeConnectorDataChangeListener.class);
+
+    private List<IPluginOutInventoryService> inventoryPublisher;
+
+    public List<IPluginOutInventoryService> getInventoryPublisher() {
+      return this.inventoryPublisher;
+    }
+
+    public void setInventoryPublisher(final List<IPluginOutInventoryService> inventoryPublisher) {
+      this.inventoryPublisher = inventoryPublisher;
+    }
+
+    @Override
+    public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+        final Map<InstanceIdentifier<?>,DataObject> createdOperationalData = change.getCreatedOperationalData();
+        final Map<InstanceIdentifier<?>,DataObject> updatedOperationalData = change.getUpdatedOperationalData();
+
+        final Set<Map.Entry<InstanceIdentifier<?>,DataObject>> createdEntries = createdOperationalData.entrySet();
+        final Set<Map.Entry<InstanceIdentifier<?>,DataObject>> updatedEntries = new HashSet<>();
+
+        updatedEntries.addAll(updatedOperationalData.entrySet());
+        updatedEntries.removeAll(createdEntries);
+
+        for(final Map.Entry<InstanceIdentifier<?>,DataObject> entry : createdEntries){
+            publishNodeConnectorUpdate(entry, UpdateType.ADDED);
+        }
+
+        for(final Map.Entry<InstanceIdentifier<?>,DataObject> entry : updatedEntries){
+            publishNodeConnectorUpdate(entry, UpdateType.CHANGED);
+        }
+    }
+
+    private void publishNodeConnectorUpdate(final Map.Entry<InstanceIdentifier<?>,DataObject> entry, final UpdateType updateType) {
+        if (entry.getKey().getTargetType().equals(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector.class)) {
+            NodeConnectorRef nodeConnectorRef = new NodeConnectorRef(entry.getKey());
+            NodeConnector nodeConnector = null;
+            try {
+                nodeConnector = NodeMapping.toADNodeConnector(nodeConnectorRef);
+            } catch (ConstructionException e) {
+                e.printStackTrace();
+            }
+            HashSet<Property> _aDNodeConnectorProperties = NodeMapping.toADNodeConnectorProperties((org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector) entry.getValue());
+            this.publishNodeConnectorUpdate(nodeConnector, updateType, _aDNodeConnectorProperties);
+        }
+    }
+
+    private void publishNodeConnectorUpdate(final NodeConnector nodeConnector, final UpdateType updateType, final Set<Property> properties) {
+      LOG.debug("Publishing NodeConnector " + updateType.toString() + " nodeConnector Id = " + nodeConnector.getNodeConnectorIdAsString());
+
+      List<IPluginOutInventoryService> _inventoryPublisher = getInventoryPublisher();
+      for (final IPluginOutInventoryService publisher : _inventoryPublisher) {
+        publisher.updateNodeConnector(nodeConnector, updateType, properties);
+      }
+    }
+}
index 1a66b3b..43f48a5 100644 (file)
@@ -64,6 +64,8 @@ class FlowCapableInventoryProvider implements AutoCloseable {
 
 class NodeChangeCommiter implements OpendaylightInventoryListener {
 
+    static val LOG = LoggerFactory.getLogger(NodeChangeCommiter);
+
     @Property
     val FlowCapableInventoryProvider manager;
 
@@ -76,6 +78,9 @@ class NodeChangeCommiter implements OpendaylightInventoryListener {
 
         // Check path
         val it = manager.startChange()
+
+        LOG.debug("removing node connector : " + ref.value.toString());
+
         removeOperationalData(ref.value as InstanceIdentifier<? extends DataObject>);
         commit()
     }
@@ -93,6 +98,8 @@ class NodeChangeCommiter implements OpendaylightInventoryListener {
             data.addAugmentation(FlowCapableNodeConnector, augment)
         }
 
+        LOG.debug("updating node connector : " + ref.value.toString());
+
         putOperationalData(ref.value as InstanceIdentifier<NodeConnector>, data.build());
         commit()
     }
@@ -101,6 +108,8 @@ class NodeChangeCommiter implements OpendaylightInventoryListener {
         val ref = node.nodeRef;
         val it = manager.startChange()
 
+        LOG.debug("removing node : " + ref.value.toString());
+
         removeOperationalData(ref.value as InstanceIdentifier<? extends DataObject>);
         commit()
     }
@@ -117,6 +126,8 @@ class NodeChangeCommiter implements OpendaylightInventoryListener {
             data.addAugmentation(FlowCapableNode, augment)
         }
 
+        LOG.debug("updating node : " + ref.value.toString());
+
         putOperationalData(ref.value as InstanceIdentifier<Node>, data.build())
         commit()
     }
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/AbstractStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/AbstractStatsTracker.java
new file mode 100644 (file)
index 0000000..aa7720c
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
+
+import com.google.common.base.Preconditions;
+
+abstract class AbstractStatsTracker<I, K> {
+    private final Map<K, Long> trackedItems = new HashMap<>();
+    private final InstanceIdentifier<Node> nodeIdentifier;
+    private final DataProviderService dps;
+    private final long lifetimeNanos;
+
+    protected AbstractStatsTracker(final InstanceIdentifier<Node> nodeIdentifier, final DataProviderService dps, long lifetimeNanos) {
+        this.nodeIdentifier = Preconditions.checkNotNull(nodeIdentifier);
+        this.dps = Preconditions.checkNotNull(dps);
+        this.lifetimeNanos = lifetimeNanos;
+    }
+
+    protected final InstanceIdentifierBuilder<Node> getNodeIdentifierBuilder() {
+        return InstanceIdentifier.builder(nodeIdentifier);
+    }
+
+    final synchronized void updateStats(List<I> list) {
+        final Long expiryTime = System.nanoTime() + lifetimeNanos;
+        final DataModificationTransaction trans = dps.beginTransaction();
+
+        for (final I item : list) {
+            trackedItems.put(updateSingleStat(trans, item), expiryTime);
+        }
+
+        trans.commit();
+    }
+
+
+    final synchronized void cleanup(final DataModificationTransaction trans, long now) {
+        for (Iterator<Entry<K, Long>> it = trackedItems.entrySet().iterator();it.hasNext();){
+            Entry<K, Long> e = it.next();
+            if (now > e.getValue()) {
+                cleanupSingleStat(trans, e.getKey());
+                it.remove();
+            }
+        }
+    }
+
+    protected abstract void cleanupSingleStat(DataModificationTransaction trans, K item);
+    protected abstract K updateSingleStat(DataModificationTransaction trans, I item);
+}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowCapableTracker.java
new file mode 100644 (file)
index 0000000..bb1544c
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Collection;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Sets;
+
+/**
+ * There is a single instance of this class and that instance is responsible for
+ * monitoring the operational data store for nodes being created/deleted and
+ * notifying StatisticsProvider. These events then control the lifecycle of
+ * NodeStatisticsHandler for a particular switch.
+ */
+final class FlowCapableTracker implements DataChangeListener {
+    private static final Logger logger = LoggerFactory.getLogger(FlowCapableTracker.class);
+
+    private final InstanceIdentifier<FlowCapableNode> root;
+    private final StatisticsProvider stats;
+
+    private final Predicate<InstanceIdentifier<?>> filterIdentifiers = new Predicate<InstanceIdentifier<?>>() {
+        @Override
+        public boolean apply(final InstanceIdentifier<?> input) {
+            /*
+             * This notification has been triggered either by the ancestor,
+             * descendant or directly for the FlowCapableNode itself. We
+             * are not interested descendants, so let's prune them based
+             * on the depth of their identifier.
+             */
+            if (root.getPath().size() < input.getPath().size()) {
+                logger.debug("Ignoring notification for descendant {}", input);
+                return false;
+            }
+
+            logger.debug("Including notification for {}", input);
+            return true;
+        }
+    };
+
+    public FlowCapableTracker(final StatisticsProvider stats, InstanceIdentifier<FlowCapableNode> root) {
+        this.stats = Preconditions.checkNotNull(stats);
+        this.root = Preconditions.checkNotNull(root);
+    }
+
+    /*
+     * This method is synchronized because we want to make sure to serialize input
+     * from the datastore. Competing add/remove could be problematic otherwise.
+     */
+    @Override
+    public synchronized void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+        logger.debug("Tracker at root {} processing notification", root);
+
+        /*
+         * First process all the identifiers which were removed, trying to figure out
+         * whether they constitute removal of FlowCapableNode.
+         */
+        final Collection<NodeKey> removedNodes =
+            Collections2.filter(Collections2.transform(
+                Sets.filter(change.getRemovedOperationalData(), filterIdentifiers),
+                new Function<InstanceIdentifier<?>, NodeKey>() {
+                    @Override
+                    public NodeKey apply(final InstanceIdentifier<?> input) {
+                        final NodeKey key = input.firstKeyOf(Node.class, NodeKey.class);
+                        if (key == null) {
+                            // FIXME: do we have a backup plan?
+                            logger.info("Failed to extract node key from {}", input);
+                        }
+                        return key;
+                    }
+                }), Predicates.notNull());
+        stats.stopNodeHandlers(removedNodes);
+
+        final Collection<NodeKey> addedNodes =
+            Collections2.filter(Collections2.transform(
+                Sets.filter(change.getCreatedOperationalData().keySet(), filterIdentifiers),
+                new Function<InstanceIdentifier<?>, NodeKey>() {
+                    @Override
+                    public NodeKey apply(final InstanceIdentifier<?> input) {
+                        final NodeKey key = input.firstKeyOf(Node.class, NodeKey.class);
+                        if (key == null) {
+                            // FIXME: do we have a backup plan?
+                            logger.info("Failed to extract node key from {}", input);
+                    }
+                    return key;
+                }
+            }), Predicates.notNull());
+        stats.startNodeHandlers(addedNodes);
+
+        logger.debug("Tracker at root {} finished processing notification", root);
+    }
+}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsEntry.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsEntry.java
new file mode 100644 (file)
index 0000000..b5b39d9
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+
+final class FlowStatsEntry {
+    private final Short tableId;
+    private final Flow flow;
+
+    public FlowStatsEntry(Short tableId, Flow flow){
+        this.tableId = tableId;
+        this.flow = flow;
+    }
+
+    public Short getTableId() {
+        return tableId;
+    }
+
+    public Flow getFlow() {
+        return flow;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((flow == null) ? 0 : flow.hashCode());
+        result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        FlowStatsEntry other = (FlowStatsEntry) obj;
+        if (flow == null) {
+            if (other.flow != null)
+                return false;
+        } else if (!flow.equals(other.flow))
+            return false;
+        if (tableId == null) {
+            if (other.tableId != null)
+                return false;
+        } else if (!tableId.equals(other.tableId))
+            return false;
+        return true;
+    }
+}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowStatsTracker.java
new file mode 100644 (file)
index 0000000..e185437
--- /dev/null
@@ -0,0 +1,186 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class FlowStatsTracker extends AbstractStatsTracker<FlowAndStatisticsMapList, FlowStatsEntry> {
+    private static final Logger logger = LoggerFactory.getLogger(FlowStatsTracker.class);
+    private int unaccountedFlowsCounter = 1;
+
+    FlowStatsTracker(InstanceIdentifier<Node> nodeIdentifier, DataProviderService dps, long lifetimeNanos) {
+        super(nodeIdentifier, dps, lifetimeNanos);
+    }
+
+    @Override
+    protected void cleanupSingleStat(DataModificationTransaction trans, FlowStatsEntry item) {
+        InstanceIdentifier<?> flowRef = getNodeIdentifierBuilder()
+                            .augmentation(FlowCapableNode.class)
+                            .child(Table.class, new TableKey(item.getTableId()))
+                            .child(Flow.class,item.getFlow().getKey())
+                            .augmentation(FlowStatisticsData.class).toInstance();
+        trans.removeOperationalData(flowRef);
+    }
+
+    @Override
+    protected FlowStatsEntry updateSingleStat(DataModificationTransaction trans, FlowAndStatisticsMapList map) {
+        short tableId = map.getTableId();
+
+        FlowBuilder flowBuilder = new FlowBuilder();
+
+        FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
+
+        FlowBuilder flow = new FlowBuilder();
+        flow.setContainerName(map.getContainerName());
+        flow.setBufferId(map.getBufferId());
+        flow.setCookie(map.getCookie());
+        flow.setCookieMask(map.getCookieMask());
+        flow.setFlags(map.getFlags());
+        flow.setFlowName(map.getFlowName());
+        flow.setHardTimeout(map.getHardTimeout());
+        if(map.getFlowId() != null)
+            flow.setId(new FlowId(map.getFlowId().getValue()));
+        flow.setIdleTimeout(map.getIdleTimeout());
+        flow.setInstallHw(map.isInstallHw());
+        flow.setInstructions(map.getInstructions());
+        if(map.getFlowId()!= null)
+            flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
+        flow.setMatch(map.getMatch());
+        flow.setOutGroup(map.getOutGroup());
+        flow.setOutPort(map.getOutPort());
+        flow.setPriority(map.getPriority());
+        flow.setStrict(map.isStrict());
+        flow.setTableId(tableId);
+
+        Flow flowRule = flow.build();
+
+        FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
+        stats.setByteCount(map.getByteCount());
+        stats.setPacketCount(map.getPacketCount());
+        stats.setDuration(map.getDuration());
+
+        GenericStatistics flowStats = stats.build();
+
+        //Augment the data to the flow node
+
+        FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
+        flowStatistics.setByteCount(flowStats.getByteCount());
+        flowStatistics.setPacketCount(flowStats.getPacketCount());
+        flowStatistics.setDuration(flowStats.getDuration());
+        flowStatistics.setContainerName(map.getContainerName());
+        flowStatistics.setBufferId(map.getBufferId());
+        flowStatistics.setCookie(map.getCookie());
+        flowStatistics.setCookieMask(map.getCookieMask());
+        flowStatistics.setFlags(map.getFlags());
+        flowStatistics.setFlowName(map.getFlowName());
+        flowStatistics.setHardTimeout(map.getHardTimeout());
+        flowStatistics.setIdleTimeout(map.getIdleTimeout());
+        flowStatistics.setInstallHw(map.isInstallHw());
+        flowStatistics.setInstructions(map.getInstructions());
+        flowStatistics.setMatch(map.getMatch());
+        flowStatistics.setOutGroup(map.getOutGroup());
+        flowStatistics.setOutPort(map.getOutPort());
+        flowStatistics.setPriority(map.getPriority());
+        flowStatistics.setStrict(map.isStrict());
+        flowStatistics.setTableId(tableId);
+
+        flowStatisticsData.setFlowStatistics(flowStatistics.build());
+
+        logger.debug("Flow : {}",flowRule.toString());
+        logger.debug("Statistics to augment : {}",flowStatistics.build().toString());
+
+        InstanceIdentifier<Table> tableRef = getNodeIdentifierBuilder()
+                .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
+
+        //TODO: Not a good way to do it, need to figure out better way.
+        //TODO: major issue in any alternate approach is that flow key is incrementally assigned
+        //to the flows stored in data store.
+        // Augment same statistics to all the matching masked flow
+        Table table= (Table)trans.readConfigurationData(tableRef);
+        if(table != null){
+            for(Flow existingFlow : table.getFlow()){
+                logger.debug("Existing flow in data store : {}",existingFlow.toString());
+                if(FlowComparator.flowEquals(flowRule,existingFlow)){
+                    InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder()
+                            .augmentation(FlowCapableNode.class)
+                            .child(Table.class, new TableKey(tableId))
+                            .child(Flow.class,existingFlow.getKey()).toInstance();
+                    flowBuilder.setKey(existingFlow.getKey());
+                    flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+                    logger.debug("Found matching flow in the datastore, augmenting statistics");
+                    // Update entry with timestamp of latest response
+                    flow.setKey(existingFlow.getKey());
+                    FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
+                    trans.putOperationalData(flowRef, flowBuilder.build());
+                    return flowStatsEntry;
+                }
+            }
+        }
+
+        table = (Table)trans.readOperationalData(tableRef);
+        if(table != null){
+            for(Flow existingFlow : table.getFlow()){
+                FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
+                if(augmentedflowStatisticsData != null){
+                    FlowBuilder existingOperationalFlow = new FlowBuilder();
+                    existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
+                    logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
+                    if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
+                        InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder()
+                                .augmentation(FlowCapableNode.class)
+                                .child(Table.class, new TableKey(tableId))
+                                .child(Flow.class,existingFlow.getKey()).toInstance();
+                        flowBuilder.setKey(existingFlow.getKey());
+                        flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+                        logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
+                        // Update entry with timestamp of latest response
+                        flow.setKey(existingFlow.getKey());
+                        FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
+                        trans.putOperationalData(flowRef, flowBuilder.build());
+                        return flowStatsEntry;
+                    }
+                }
+            }
+        }
+
+        String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
+        this.unaccountedFlowsCounter++;
+        FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
+        InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder().augmentation(FlowCapableNode.class)
+                    .child(Table.class, new TableKey(tableId))
+                    .child(Flow.class,newFlowKey).toInstance();
+        flowBuilder.setKey(newFlowKey);
+        flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+        logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",
+                    flowBuilder.build());
+
+        // Update entry with timestamp of latest response
+        flow.setKey(newFlowKey);
+        FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
+        trans.putOperationalData(flowRef, flowBuilder.build());
+        return flowStatsEntry;
+    }
+}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowTableStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/FlowTableStatsTracker.java
new file mode 100644 (file)
index 0000000..2544d55
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+final class FlowTableStatsTracker extends AbstractStatsTracker<FlowTableAndStatisticsMap, FlowTableAndStatisticsMap> {
+    private final Set<TableKey> privateTables = new ConcurrentSkipListSet<>();
+    private final Set<TableKey> tables = Collections.unmodifiableSet(privateTables);
+
+    FlowTableStatsTracker(InstanceIdentifier<Node> nodeIdentifier, DataProviderService dps, long lifetimeNanos) {
+        super(nodeIdentifier, dps, lifetimeNanos);
+    }
+
+    Set<TableKey> getTables() {
+        return tables;
+    }
+
+    @Override
+    protected void cleanupSingleStat(DataModificationTransaction trans, FlowTableAndStatisticsMap item) {
+        // TODO: do we want to do this?
+    }
+
+    @Override
+    protected FlowTableAndStatisticsMap updateSingleStat(DataModificationTransaction trans, FlowTableAndStatisticsMap item) {
+
+        InstanceIdentifier<Table> tableRef = getNodeIdentifierBuilder()
+                .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(item.getTableId().getValue())).build();
+
+        FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
+        final FlowTableStatistics stats = new FlowTableStatisticsBuilder(item).build();
+        statisticsDataBuilder.setFlowTableStatistics(stats);
+
+        TableBuilder tableBuilder = new TableBuilder();
+        tableBuilder.setKey(new TableKey(item.getTableId().getValue()));
+        tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
+        trans.putOperationalData(tableRef, tableBuilder.build());
+        return item;
+    }
+}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupDescStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupDescStatsTracker.java
new file mode 100644 (file)
index 0000000..928bf4e
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStatsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.desc.GroupDescBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+final class GroupDescStatsTracker extends AbstractStatsTracker<GroupDescStats, GroupDescStats> {
+    public GroupDescStatsTracker(final InstanceIdentifier<Node> targetNodeIdentifier, final DataProviderService dps, final long lifetimeNanos) {
+        super(targetNodeIdentifier, dps, lifetimeNanos);
+    }
+
+    @Override
+    protected GroupDescStats updateSingleStat(DataModificationTransaction trans, GroupDescStats item) {
+        GroupBuilder groupBuilder = new GroupBuilder();
+        GroupKey groupKey = new GroupKey(item.getGroupId());
+        groupBuilder.setKey(groupKey);
+
+        InstanceIdentifier<Group> groupRef = getNodeIdentifierBuilder()
+                .augmentation(FlowCapableNode.class).child(Group.class,groupKey).build();
+
+        NodeGroupDescStatsBuilder groupDesc= new NodeGroupDescStatsBuilder();
+        groupDesc.setGroupDesc(new GroupDescBuilder(item).build());
+
+        //Update augmented data
+        groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
+
+        trans.putOperationalData(groupRef, groupBuilder.build());
+        return item;
+    }
+
+    @Override
+    protected void cleanupSingleStat(DataModificationTransaction trans, GroupDescStats item) {
+        InstanceIdentifier<NodeGroupDescStats> groupRef = getNodeIdentifierBuilder().augmentation(FlowCapableNode.class)
+                .child(Group.class, new GroupKey(item.getGroupId())).augmentation(NodeGroupDescStats.class).build();
+        trans.removeOperationalData(groupRef);
+    }
+}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/GroupStatsTracker.java
new file mode 100644 (file)
index 0000000..a5498f5
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+final class GroupStatsTracker extends AbstractStatsTracker<GroupStats, GroupStats> {
+
+    GroupStatsTracker(InstanceIdentifier<Node> nodeIdentifier, DataProviderService dps, long lifetimeNanos) {
+        super(nodeIdentifier, dps, lifetimeNanos);
+    }
+
+    @Override
+    protected void cleanupSingleStat(DataModificationTransaction trans, GroupStats item) {
+        InstanceIdentifier<NodeGroupStatistics> groupRef = getNodeIdentifierBuilder().augmentation(FlowCapableNode.class)
+                .child(Group.class, new GroupKey(item.getGroupId())).augmentation(NodeGroupStatistics.class).build();
+        trans.removeOperationalData(groupRef);
+    }
+
+    @Override
+    protected GroupStats updateSingleStat(DataModificationTransaction trans,
+            GroupStats item) {
+        GroupBuilder groupBuilder = new GroupBuilder();
+        GroupKey groupKey = new GroupKey(item.getGroupId());
+        groupBuilder.setKey(groupKey);
+
+        InstanceIdentifier<Group> groupRef = getNodeIdentifierBuilder().augmentation(FlowCapableNode.class)
+                .child(Group.class,groupKey).build();
+
+        NodeGroupStatisticsBuilder groupStatisticsBuilder= new NodeGroupStatisticsBuilder();
+        groupStatisticsBuilder.setGroupStatistics(new GroupStatisticsBuilder(item).build());
+
+        //Update augmented data
+        groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
+        trans.putOperationalData(groupRef, groupBuilder.build());
+        return item;
+    }
+
+}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterConfigStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterConfigStatsTracker.java
new file mode 100644 (file)
index 0000000..dcb0b40
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStatsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterConfigStatsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+final class MeterConfigStatsTracker extends AbstractStatsTracker<MeterConfigStats, MeterConfigStats> {
+    protected MeterConfigStatsTracker(InstanceIdentifier<Node> nodeIdentifier, DataProviderService dps, long lifetimeNanos) {
+        super(nodeIdentifier, dps, lifetimeNanos);
+    }
+
+    @Override
+    protected void cleanupSingleStat(DataModificationTransaction trans, MeterConfigStats item) {
+        InstanceIdentifier<NodeMeterConfigStats> meterRef = getNodeIdentifierBuilder()
+                            .augmentation(FlowCapableNode.class)
+                            .child(Meter.class, new MeterKey(item.getMeterId()))
+                            .augmentation(NodeMeterConfigStats.class).build();
+        trans.removeOperationalData(meterRef);
+    }
+
+    @Override
+    protected MeterConfigStats updateSingleStat(DataModificationTransaction trans, MeterConfigStats item) {
+        MeterBuilder meterBuilder = new MeterBuilder();
+        MeterKey meterKey = new MeterKey(item.getMeterId());
+        meterBuilder.setKey(meterKey);
+
+        InstanceIdentifier<Meter> meterRef = getNodeIdentifierBuilder().augmentation(FlowCapableNode.class)
+                .child(Meter.class,meterKey).toInstance();
+
+        NodeMeterConfigStatsBuilder meterConfig = new NodeMeterConfigStatsBuilder();
+        meterConfig.setMeterConfigStats(new MeterConfigStatsBuilder(item).build());
+
+        //Update augmented data
+        meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
+
+        trans.putOperationalData(meterRef, meterBuilder.build());
+        return item;
+    }
+
+}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MeterStatsTracker.java
new file mode 100644 (file)
index 0000000..381db8a
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+final class MeterStatsTracker extends AbstractStatsTracker<MeterStats, MeterStats> {
+
+    MeterStatsTracker(InstanceIdentifier<Node> nodeIdentifier, DataProviderService dps, long lifetimeNanos) {
+        super(nodeIdentifier, dps, lifetimeNanos);
+    }
+
+    @Override
+    protected void cleanupSingleStat(DataModificationTransaction trans, MeterStats item) {
+        InstanceIdentifier<NodeMeterStatistics> meterRef = getNodeIdentifierBuilder()
+                            .augmentation(FlowCapableNode.class)
+                            .child(Meter.class,new MeterKey(item.getMeterId()))
+                            .augmentation(NodeMeterStatistics.class).build();
+        trans.removeOperationalData(meterRef);
+    }
+
+    @Override
+    protected MeterStats updateSingleStat(DataModificationTransaction trans, MeterStats item) {
+        MeterBuilder meterBuilder = new MeterBuilder();
+        MeterKey meterKey = new MeterKey(item.getMeterId());
+        meterBuilder.setKey(meterKey);
+
+        InstanceIdentifier<Meter> meterRef = getNodeIdentifierBuilder()
+                .augmentation(FlowCapableNode.class).child(Meter.class,meterKey).build();
+
+        NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
+        meterStatsBuilder.setMeterStatistics(new MeterStatisticsBuilder(item).build());
+
+        //Update augmented data
+        meterBuilder.addAugmentation(NodeMeterStatistics.class, meterStatsBuilder.build());
+        trans.putOperationalData(meterRef, meterBuilder.build());
+        return item;
+    }
+}
index 6f58708..0ce551a 100644 (file)
@@ -7,54 +7,49 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
-import java.util.Date;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+import com.google.common.base.Preconditions;
 
 /**
- * Main responsibility of the class is to manage multipart response 
+ * Main responsibility of the class is to manage multipart response
  * for multipart request. It also handles the flow aggregate request
- * and response mapping. 
+ * and response mapping.
  * @author avishnoi@in.ibm.com
  *
  */
 public class MultipartMessageManager {
+    private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
     /*
-     *  Map for tx id and type of request, to keep track of all the request sent 
-     *  by Statistics Manager. Statistics Manager won't entertain any multipart 
-     *  response for which it didn't send the request.  
+     *  Map for tx id and type of request, to keep track of all the request sent
+     *  by Statistics Manager. Statistics Manager won't entertain any multipart
+     *  response for which it didn't send the request.
      */
-    
-    private static Map<TxIdEntry,Date> txIdToRequestTypeMap = new ConcurrentHashMap<TxIdEntry,Date>();
+    private final Map<TxIdEntry,Long> txIdToRequestTypeMap = new HashMap<>();
     /*
      * Map to keep track of the request tx id for flow table statistics request.
      * Because flow table statistics multi part response do not contains the table id.
      */
-    private static Map<TxIdEntry,Short> txIdTotableIdMap = new ConcurrentHashMap<TxIdEntry,Short>();
-    
-    private final int NUMBER_OF_WAIT_CYCLES =2;
+    private final Map<TxIdEntry,Short> txIdTotableIdMap = new HashMap<>();
 
-    class TxIdEntry{
-        private final TransactionId txId;
-        private final NodeId nodeId;
+    private static final class TxIdEntry {
         private final StatsRequestType requestType;
-        
-        public TxIdEntry(NodeId nodeId, TransactionId txId, StatsRequestType requestType){
+        private final TransactionId txId;
+
+        public TxIdEntry(TransactionId txId, StatsRequestType requestType){
             this.txId = txId;
-            this.nodeId = nodeId;
             this.requestType = requestType;
         }
         public TransactionId getTxId() {
             return txId;
         }
-        public NodeId getNodeId() {
-            return nodeId;
-        }
         public StatsRequestType getRequestType() {
             return requestType;
         }
@@ -62,8 +57,6 @@ public class MultipartMessageManager {
         public int hashCode() {
             final int prime = 31;
             int result = 1;
-            result = prime * result + getOuterType().hashCode();
-            result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
             result = prime * result + ((txId == null) ? 0 : txId.hashCode());
             return result;
         }
@@ -79,16 +72,7 @@ public class MultipartMessageManager {
                 return false;
             }
             TxIdEntry other = (TxIdEntry) obj;
-            if (!getOuterType().equals(other.getOuterType())) {
-                return false;
-            }
-            if (nodeId == null) {
-                if (other.nodeId != null) {
-                    return false;
-                }
-            } else if (!nodeId.equals(other.nodeId)) {
-                return false;
-            }
+
             if (txId == null) {
                 if (other.txId != null) {
                     return false;
@@ -98,52 +82,48 @@ public class MultipartMessageManager {
             }
             return true;
         }
-        private MultipartMessageManager getOuterType() {
-            return MultipartMessageManager.this;
-        }
+
         @Override
         public String toString() {
-            return "TxIdEntry [txId=" + txId + ", nodeId=" + nodeId + ", requestType=" + requestType + "]";
+            return "TxIdEntry [txId=" + txId + ", requestType=" + requestType + "]";
         }
     }
 
-    public MultipartMessageManager(){}
-    
-    public Short getTableIdForTxId(NodeId nodeId,TransactionId id){
-        
-        return txIdTotableIdMap.get(new TxIdEntry(nodeId,id,null));
-        
+    public void recordExpectedTableTransaction(TransactionId id, StatsRequestType type, Short tableId) {
+        recordExpectedTransaction(id, type);
+        txIdTotableIdMap.put(new TxIdEntry(id, null), Preconditions.checkNotNull(tableId));
     }
-    
-    public void setTxIdAndTableIdMapEntry(NodeId nodeId, TransactionId id,Short tableId){
-        if(id == null)
-            return;
-        txIdTotableIdMap.put(new TxIdEntry(nodeId,id,null), tableId);
-    }
-    
-    public boolean isRequestTxIdExist(NodeId nodeId, TransactionId id, Boolean moreRepliesToFollow){
-        TxIdEntry entry = new TxIdEntry(nodeId,id,null);
-        if(moreRepliesToFollow.booleanValue()){
-            return txIdToRequestTypeMap.containsKey(entry);
-        }else{
-            return txIdToRequestTypeMap.remove(entry)==null?false:true;
+
+    public Short isExpectedTableTransaction(TransactionAware transaction, Boolean more) {
+        if (!isExpectedTransaction(transaction, more)) {
+            return null;
+        }
+
+        final TxIdEntry key = new TxIdEntry(transaction.getTransactionId(), null);
+        if (more != null && more.booleanValue()) {
+            return txIdTotableIdMap.get(key);
+        } else {
+            return txIdTotableIdMap.remove(key);
         }
     }
-    public void addTxIdToRequestTypeEntry (NodeId nodeId, TransactionId id,StatsRequestType type){
-        if(id == null)
-            return;
-        TxIdEntry entry = new TxIdEntry(nodeId,id,type);
+
+    public void recordExpectedTransaction(TransactionId id, StatsRequestType type) {
+        TxIdEntry entry = new TxIdEntry(Preconditions.checkNotNull(id), Preconditions.checkNotNull(type));
         txIdToRequestTypeMap.put(entry, getExpiryTime());
     }
-    public boolean removeTxId(NodeId nodeId, TransactionId id){
-        TxIdEntry entry = new TxIdEntry(nodeId,id,null);
-        return txIdToRequestTypeMap.remove(entry)==null?false:true;
+
+    public boolean isExpectedTransaction(TransactionAware transaction, Boolean more) {
+        TxIdEntry entry = new TxIdEntry(transaction.getTransactionId(), null);
+        if (more != null && more.booleanValue()) {
+            return txIdToRequestTypeMap.containsKey(entry);
+        } else {
+            return txIdToRequestTypeMap.remove(entry) != null;
+        }
     }
-    
-    private Date getExpiryTime(){
-        Date expires = new Date();
-        expires.setTime(expires.getTime()+StatisticsProvider.STATS_THREAD_EXECUTION_TIME*NUMBER_OF_WAIT_CYCLES);
-        return expires;
+
+    private static Long getExpiryTime(){
+        return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(
+                StatisticsProvider.STATS_COLLECTION_MILLIS*NUMBER_OF_WAIT_CYCLES);
     }
 
     public enum StatsRequestType{
@@ -157,16 +137,18 @@ public class MultipartMessageManager {
         GROUP_DESC,
         METER_CONFIG
     }
-    
+
     public void cleanStaleTransactionIds(){
+        final long now = System.nanoTime();
+
         for (Iterator<TxIdEntry> it = txIdToRequestTypeMap.keySet().iterator();it.hasNext();){
             TxIdEntry txIdEntry = it.next();
-            Date now = new Date();
-            Date expiryTime = txIdToRequestTypeMap.get(txIdEntry);
-            if(now.after(expiryTime)){
+
+            Long expiryTime = txIdToRequestTypeMap.get(txIdEntry);
+            if(now > expiryTime){
                 it.remove();
                 txIdTotableIdMap.remove(txIdEntry);
-            }            
+            }
         }
     }
 }
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeConnectorStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeConnectorStatsTracker.java
new file mode 100644 (file)
index 0000000..2a8b180
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class NodeConnectorStatsTracker extends AbstractStatsTracker<NodeConnectorStatisticsAndPortNumberMap, NodeConnectorStatisticsAndPortNumberMap> {
+    private static final Logger logger = LoggerFactory.getLogger(NodeConnectorStatsTracker.class);
+
+    NodeConnectorStatsTracker(InstanceIdentifier<Node> nodeIdentifier, DataProviderService dps, long lifetimeNanos) {
+        super(nodeIdentifier, dps, lifetimeNanos);
+    }
+
+    @Override
+    protected void cleanupSingleStat(DataModificationTransaction trans, NodeConnectorStatisticsAndPortNumberMap item) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    protected NodeConnectorStatisticsAndPortNumberMap updateSingleStat(DataModificationTransaction trans, NodeConnectorStatisticsAndPortNumberMap item) {
+        FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
+                                        = new FlowCapableNodeConnectorStatisticsBuilder();
+        statisticsBuilder.setBytes(item.getBytes());
+        statisticsBuilder.setCollisionCount(item.getCollisionCount());
+        statisticsBuilder.setDuration(item.getDuration());
+        statisticsBuilder.setPackets(item.getPackets());
+        statisticsBuilder.setReceiveCrcError(item.getReceiveCrcError());
+        statisticsBuilder.setReceiveDrops(item.getReceiveDrops());
+        statisticsBuilder.setReceiveErrors(item.getReceiveErrors());
+        statisticsBuilder.setReceiveFrameError(item.getReceiveFrameError());
+        statisticsBuilder.setReceiveOverRunError(item.getReceiveOverRunError());
+        statisticsBuilder.setTransmitDrops(item.getTransmitDrops());
+        statisticsBuilder.setTransmitErrors(item.getTransmitErrors());
+
+        //Augment data to the node-connector
+        FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
+                new FlowCapableNodeConnectorStatisticsDataBuilder();
+
+        statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
+
+        InstanceIdentifier<NodeConnector> nodeConnectorRef = getNodeIdentifierBuilder()
+                .child(NodeConnector.class, new NodeConnectorKey(item.getNodeConnectorId())).build();
+
+        // FIXME: can we bypass this read?
+        NodeConnector nodeConnector = (NodeConnector)trans.readOperationalData(nodeConnectorRef);
+        if(nodeConnector != null){
+            final FlowCapableNodeConnectorStatisticsData stats = statisticsDataBuilder.build();
+            logger.debug("Augmenting port statistics {} to port {}",stats,nodeConnectorRef.toString());
+            NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
+            nodeConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, stats);
+            trans.putOperationalData(nodeConnectorRef, nodeConnectorBuilder.build());
+        }
+
+        return item;
+    }
+}
index 395bacb..5d5d172 100644 (file)
@@ -7,90 +7,79 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStatsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.desc.GroupDescBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.statistics.GroupStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStatsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeaturesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterConfigStatsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.flow.capable.node.connector.queue.statistics.FlowCapableNodeConnectorQueueStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,363 +92,123 @@ import com.google.common.base.Preconditions;
  *
  * @author avishnoi@in.ibm.com
  */
-public class NodeStatisticsHandler {
+public final class NodeStatisticsHandler implements AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
     private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
-    private final Map<GroupDescStats,Long> groupDescStatsUpdate = new HashMap<>();
-    private final Map<MeterConfigStats,Long> meterConfigStatsUpdate = new HashMap<>();
-    private final Map<FlowEntry,Long> flowStatsUpdate = new HashMap<>();
-    private final Map<QueueEntry,Long> queuesStatsUpdate = new HashMap<>();
+    private final OpendaylightFlowStatisticsService flowStatsService;
+    private final OpendaylightFlowTableStatisticsService flowTableStatsService;
+    private final OpendaylightGroupStatisticsService groupStatsService;
+    private final OpendaylightMeterStatisticsService meterStatsService;
+    private final OpendaylightPortStatisticsService portStatsService;
+    private final OpendaylightQueueStatisticsService queueStatsService;
+
+    private final MultipartMessageManager msgManager = new MultipartMessageManager();
     private final InstanceIdentifier<Node> targetNodeIdentifier;
-    private final StatisticsProvider statisticsProvider;
+    private final FlowStatsTracker flowStats;
+    private final FlowTableStatsTracker flowTableStats;
+    private final GroupDescStatsTracker groupDescStats;
+    private final GroupStatsTracker groupStats;
+    private final MeterConfigStatsTracker meterConfigStats;
+    private final MeterStatsTracker meterStats;
+    private final NodeConnectorStatsTracker nodeConnectorStats;
+    private final QueueStatsTracker queueStats;
+    private final DataProviderService dps;
+    private final NodeRef targetNodeRef;
     private final NodeKey targetNodeKey;
-    private int unaccountedFlowsCounter = 1;
 
-    public NodeStatisticsHandler(StatisticsProvider statisticsProvider, NodeKey nodeKey){
-        this.statisticsProvider = Preconditions.checkNotNull(statisticsProvider);
+    public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey,
+            final OpendaylightFlowStatisticsService flowStatsService,
+            final OpendaylightFlowTableStatisticsService flowTableStatsService,
+            final OpendaylightGroupStatisticsService groupStatsService,
+            final OpendaylightMeterStatisticsService meterStatsService,
+            final OpendaylightPortStatisticsService portStatsService,
+            final OpendaylightQueueStatisticsService queueStatsService) {
+        this.dps = Preconditions.checkNotNull(dps);
         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
-    }
-
-    private static class FlowEntry {
-        private final Short tableId;
-        private final Flow flow;
-
-        public FlowEntry(Short tableId, Flow flow){
-            this.tableId = tableId;
-            this.flow = flow;
-        }
-
-        public Short getTableId() {
-            return tableId;
-        }
-
-        public Flow getFlow() {
-            return flow;
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((flow == null) ? 0 : flow.hashCode());
-            result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null)
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            FlowEntry other = (FlowEntry) obj;
-            if (flow == null) {
-                if (other.flow != null)
-                    return false;
-            } else if (!flow.equals(other.flow))
-                return false;
-            if (tableId == null) {
-                if (other.tableId != null)
-                    return false;
-            } else if (!tableId.equals(other.tableId))
-                return false;
-            return true;
-        }
-    }
-
-    private static final class QueueEntry{
-        private final NodeConnectorId nodeConnectorId;
-        private final QueueId queueId;
-        public QueueEntry(NodeConnectorId ncId, QueueId queueId){
-            this.nodeConnectorId = ncId;
-            this.queueId = queueId;
-        }
-        public NodeConnectorId getNodeConnectorId() {
-            return nodeConnectorId;
-        }
-        public QueueId getQueueId() {
-            return queueId;
-        }
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode());
-            result = prime * result + ((queueId == null) ? 0 : queueId.hashCode());
-            return result;
-        }
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj) {
-                return true;
-            }
-            if (obj == null) {
-                return false;
-            }
-            if (!(obj instanceof QueueEntry)) {
-                return false;
-            }
-            QueueEntry other = (QueueEntry) obj;
-            if (nodeConnectorId == null) {
-                if (other.nodeConnectorId != null) {
-                    return false;
-                }
-            } else if (!nodeConnectorId.equals(other.nodeConnectorId)) {
-                return false;
-            }
-            if (queueId == null) {
-                if (other.queueId != null) {
-                    return false;
-                }
-            } else if (!queueId.equals(other.queueId)) {
-                return false;
-            }
-            return true;
-        }
+        this.targetNodeRef = new NodeRef(targetNodeIdentifier);
+
+        this.flowStatsService = flowStatsService;
+        this.flowTableStatsService = flowTableStatsService;
+        this.groupStatsService = groupStatsService;
+        this.meterStatsService = meterStatsService;
+        this.portStatsService = portStatsService;
+        this.queueStatsService = queueStatsService;
+
+        final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
+        flowStats = new FlowStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+        flowTableStats = new FlowTableStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+        groupDescStats = new GroupDescStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+        groupStats = new GroupStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+        meterConfigStats = new MeterConfigStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+        meterStats = new MeterStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+        nodeConnectorStats = new NodeConnectorStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+        queueStats = new QueueStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
     }
 
     public NodeKey getTargetNodeKey() {
         return targetNodeKey;
     }
 
-    public synchronized void updateGroupDescStats(List<GroupDescStats> list){
-        final Long expiryTime = getExpiryTime();
-        final DataModificationTransaction trans = statisticsProvider.startChange();
-
-        for (GroupDescStats groupDescStats : list) {
-            GroupBuilder groupBuilder = new GroupBuilder();
-            GroupKey groupKey = new GroupKey(groupDescStats.getGroupId());
-            groupBuilder.setKey(groupKey);
-
-            InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
-                                                                                        .augmentation(FlowCapableNode.class)
-                                                                                        .child(Group.class,groupKey).toInstance();
+    public Collection<TableKey> getKnownTables() {
+        return flowTableStats.getTables();
+    }
 
-            NodeGroupDescStatsBuilder groupDesc= new NodeGroupDescStatsBuilder();
-            GroupDescBuilder stats = new GroupDescBuilder();
-            stats.fieldsFrom(groupDescStats);
-            groupDesc.setGroupDesc(stats.build());
+    public InstanceIdentifier<Node> getTargetNodeIdentifier() {
+        return targetNodeIdentifier;
+    }
 
-            //Update augmented data
-            groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
+    public NodeRef getTargetNodeRef() {
+        return targetNodeRef;
+    }
 
-            trans.putOperationalData(groupRef, groupBuilder.build());
-            this.groupDescStatsUpdate.put(groupDescStats, expiryTime);
+    public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            groupDescStats.updateStats(list);
         }
-
-        trans.commit();
     }
 
-
-    public synchronized void updateGroupStats(List<GroupStats> list) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
-
-        for(GroupStats groupStats : list) {
-            GroupBuilder groupBuilder = new GroupBuilder();
-            GroupKey groupKey = new GroupKey(groupStats.getGroupId());
-            groupBuilder.setKey(groupKey);
-
-            InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
-                                                                                        .augmentation(FlowCapableNode.class)
-                                                                                        .child(Group.class,groupKey).toInstance();
-
-            NodeGroupStatisticsBuilder groupStatisticsBuilder= new NodeGroupStatisticsBuilder();
-            GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
-            stats.fieldsFrom(groupStats);
-            groupStatisticsBuilder.setGroupStatistics(stats.build());
-
-            //Update augmented data
-            groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
-            trans.putOperationalData(groupRef, groupBuilder.build());
-
-            // FIXME: should we be tracking this data?
+    public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            groupStats.updateStats(list);
         }
-
-        trans.commit();
     }
 
-    public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
-        final Long expiryTime = getExpiryTime();
-        final DataModificationTransaction trans = statisticsProvider.startChange();
-
-        for(MeterConfigStats meterConfigStats : list) {
-            MeterBuilder meterBuilder = new MeterBuilder();
-            MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId());
-            meterBuilder.setKey(meterKey);
-
-            InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
-                                                                                        .augmentation(FlowCapableNode.class)
-                                                                                        .child(Meter.class,meterKey).toInstance();
-
-            NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
-            MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
-            stats.fieldsFrom(meterConfigStats);
-            meterConfig.setMeterConfigStats(stats.build());
-
-            //Update augmented data
-            meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
-
-            trans.putOperationalData(meterRef, meterBuilder.build());
-            this.meterConfigStatsUpdate.put(meterConfigStats, expiryTime);
+    public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            meterConfigStats.updateStats(list);
         }
-
-        trans.commit();
     }
 
-
-    public synchronized void updateMeterStats(List<MeterStats> list) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
-
-        for(MeterStats meterStats : list) {
-            MeterBuilder meterBuilder = new MeterBuilder();
-            MeterKey meterKey = new MeterKey(meterStats.getMeterId());
-            meterBuilder.setKey(meterKey);
-
-            InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
-                                                                                        .augmentation(FlowCapableNode.class)
-                                                                                        .child(Meter.class,meterKey).toInstance();
-
-            NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
-            MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
-            stats.fieldsFrom(meterStats);
-            meterStatsBuilder.setMeterStatistics(stats.build());
-
-            //Update augmented data
-            meterBuilder.addAugmentation(NodeMeterStatistics.class, meterStatsBuilder.build());
-            trans.putOperationalData(meterRef, meterBuilder.build());
-
-            // FIXME: should we be tracking this data?
+    public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            meterStats.updateStats(list);
         }
-
-        trans.commit();
     }
 
-    public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
-        final Long expiryTime = getExpiryTime();
-        final DataModificationTransaction trans = statisticsProvider.startChange();
-
-        for (QueueIdAndStatisticsMap swQueueStats : list) {
-
-            QueueEntry queueEntry = new QueueEntry(swQueueStats.getNodeConnectorId(),swQueueStats.getQueueId());
-
-            FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
-
-            FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
-
-            queueStatisticsBuilder.fieldsFrom(swQueueStats);
-
-            queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
-
-            InstanceIdentifier<Queue> queueRef
-                    = InstanceIdentifier.builder(Nodes.class)
-                                        .child(Node.class, targetNodeKey)
-                                        .child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId()))
-                                        .augmentation(FlowCapableNodeConnector.class)
-                                        .child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance();
-
-            QueueBuilder queueBuilder = new QueueBuilder();
-            FlowCapableNodeConnectorQueueStatisticsData qsd = queueStatisticsDataBuilder.build();
-            queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, qsd);
-            queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
-
-            logger.debug("Augmenting queue statistics {} of queue {} to port {}",
-                                        qsd,
-                                        swQueueStats.getQueueId(),
-                                        swQueueStats.getNodeConnectorId());
-
-            trans.putOperationalData(queueRef, queueBuilder.build());
-            this.queuesStatsUpdate.put(queueEntry, expiryTime);
+    public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            queueStats.updateStats(list);
         }
-
-        trans.commit();
     }
 
-    public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
-
-        for (FlowTableAndStatisticsMap ftStats : list) {
-
-            InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
-                    .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance();
-
-            FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
-
-            FlowTableStatisticsBuilder statisticsBuilder = new FlowTableStatisticsBuilder();
-            statisticsBuilder.setActiveFlows(ftStats.getActiveFlows());
-            statisticsBuilder.setPacketsLookedUp(ftStats.getPacketsLookedUp());
-            statisticsBuilder.setPacketsMatched(ftStats.getPacketsMatched());
-
-            final FlowTableStatistics stats = statisticsBuilder.build();
-            statisticsDataBuilder.setFlowTableStatistics(stats);
-
-            logger.debug("Augment flow table statistics: {} for table {} on Node {}",
-                    stats,ftStats.getTableId(), targetNodeKey);
-
-            TableBuilder tableBuilder = new TableBuilder();
-            tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue()));
-            tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
-            trans.putOperationalData(tableRef, tableBuilder.build());
-
-            // FIXME: should we be tracking this data?
+    public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            flowTableStats.updateStats(list);
         }
-
-        trans.commit();
     }
 
-    public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
-
-        for(NodeConnectorStatisticsAndPortNumberMap portStats : list) {
-
-            FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
-                                            = new FlowCapableNodeConnectorStatisticsBuilder();
-            statisticsBuilder.setBytes(portStats.getBytes());
-            statisticsBuilder.setCollisionCount(portStats.getCollisionCount());
-            statisticsBuilder.setDuration(portStats.getDuration());
-            statisticsBuilder.setPackets(portStats.getPackets());
-            statisticsBuilder.setReceiveCrcError(portStats.getReceiveCrcError());
-            statisticsBuilder.setReceiveDrops(portStats.getReceiveDrops());
-            statisticsBuilder.setReceiveErrors(portStats.getReceiveErrors());
-            statisticsBuilder.setReceiveFrameError(portStats.getReceiveFrameError());
-            statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError());
-            statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops());
-            statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors());
-
-            //Augment data to the node-connector
-            FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
-                    new FlowCapableNodeConnectorStatisticsDataBuilder();
-
-            statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
-
-            InstanceIdentifier<NodeConnector> nodeConnectorRef = InstanceIdentifier.builder(Nodes.class)
-                    .child(Node.class, targetNodeKey)
-                    .child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance();
-
-            // FIXME: can we bypass this read?
-            NodeConnector nodeConnector = (NodeConnector)trans.readOperationalData(nodeConnectorRef);
-            if(nodeConnector != null){
-                final FlowCapableNodeConnectorStatisticsData stats = statisticsDataBuilder.build();
-                logger.debug("Augmenting port statistics {} to port {}",stats,nodeConnectorRef.toString());
-                NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
-                nodeConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, stats);
-                trans.putOperationalData(nodeConnectorRef, nodeConnectorBuilder.build());
-            }
-
-            // FIXME: should we be tracking this data?
+    public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            nodeConnectorStats.updateStats(list);
         }
-
-        trans.commit();
     }
 
-    public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
+    public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
+        final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
         if (tableId != null) {
-            final DataModificationTransaction trans = statisticsProvider.startChange();
-
-
+            final DataModificationTransaction trans = dps.beginTransaction();
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
 
@@ -476,13 +225,18 @@ public class NodeStatisticsHandler {
             tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
             trans.putOperationalData(tableRef, tableBuilder.build());
 
-            // FIXME: should we be tracking this data?
             trans.commit();
         }
     }
 
+    public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            flowStats.updateStats(list);
+        }
+    }
+
     public synchronized void updateGroupFeatures(GroupFeatures notification) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -500,7 +254,7 @@ public class NodeStatisticsHandler {
     }
 
     public synchronized void updateMeterFeatures(MeterFeatures features) {
-        final DataModificationTransaction trans = statisticsProvider.startChange();
+        final DataModificationTransaction trans = dps.beginTransaction();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -517,254 +271,195 @@ public class NodeStatisticsHandler {
         trans.commit();
     }
 
-    public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
-        final Long expiryTime = getExpiryTime();
-        final DataModificationTransaction trans = statisticsProvider.startChange();
-
-        for(FlowAndStatisticsMapList map : list) {
-            short tableId = map.getTableId();
-            boolean foundOriginalFlow = false;
-
-            FlowBuilder flowBuilder = new FlowBuilder();
-
-            FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
-
-            FlowBuilder flow = new FlowBuilder();
-            flow.setContainerName(map.getContainerName());
-            flow.setBufferId(map.getBufferId());
-            flow.setCookie(map.getCookie());
-            flow.setCookieMask(map.getCookieMask());
-            flow.setFlags(map.getFlags());
-            flow.setFlowName(map.getFlowName());
-            flow.setHardTimeout(map.getHardTimeout());
-            if(map.getFlowId() != null)
-                flow.setId(new FlowId(map.getFlowId().getValue()));
-            flow.setIdleTimeout(map.getIdleTimeout());
-            flow.setInstallHw(map.isInstallHw());
-            flow.setInstructions(map.getInstructions());
-            if(map.getFlowId()!= null)
-                flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
-            flow.setMatch(map.getMatch());
-            flow.setOutGroup(map.getOutGroup());
-            flow.setOutPort(map.getOutPort());
-            flow.setPriority(map.getPriority());
-            flow.setStrict(map.isStrict());
-            flow.setTableId(tableId);
-
-            Flow flowRule = flow.build();
-
-            FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
-            stats.setByteCount(map.getByteCount());
-            stats.setPacketCount(map.getPacketCount());
-            stats.setDuration(map.getDuration());
-
-            GenericStatistics flowStats = stats.build();
-
-            //Augment the data to the flow node
-
-            FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
-            flowStatistics.setByteCount(flowStats.getByteCount());
-            flowStatistics.setPacketCount(flowStats.getPacketCount());
-            flowStatistics.setDuration(flowStats.getDuration());
-            flowStatistics.setContainerName(map.getContainerName());
-            flowStatistics.setBufferId(map.getBufferId());
-            flowStatistics.setCookie(map.getCookie());
-            flowStatistics.setCookieMask(map.getCookieMask());
-            flowStatistics.setFlags(map.getFlags());
-            flowStatistics.setFlowName(map.getFlowName());
-            flowStatistics.setHardTimeout(map.getHardTimeout());
-            flowStatistics.setIdleTimeout(map.getIdleTimeout());
-            flowStatistics.setInstallHw(map.isInstallHw());
-            flowStatistics.setInstructions(map.getInstructions());
-            flowStatistics.setMatch(map.getMatch());
-            flowStatistics.setOutGroup(map.getOutGroup());
-            flowStatistics.setOutPort(map.getOutPort());
-            flowStatistics.setPriority(map.getPriority());
-            flowStatistics.setStrict(map.isStrict());
-            flowStatistics.setTableId(tableId);
-
-            flowStatisticsData.setFlowStatistics(flowStatistics.build());
-
-            logger.debug("Flow : {}",flowRule.toString());
-            logger.debug("Statistics to augment : {}",flowStatistics.build().toString());
+    public synchronized void cleanStaleStatistics() {
+        final DataModificationTransaction trans = dps.beginTransaction();
+        final long now = System.nanoTime();
 
-            InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
-                    .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
+        flowStats.cleanup(trans, now);
+        groupDescStats.cleanup(trans, now);
+        groupStats.cleanup(trans, now);
+        meterConfigStats.cleanup(trans, now);
+        meterStats.cleanup(trans, now);
+        nodeConnectorStats.cleanup(trans, now);
+        queueStats.cleanup(trans, now);
+        msgManager.cleanStaleTransactionIds();
 
-            Table table= (Table)trans.readConfigurationData(tableRef);
-
-            //TODO: Not a good way to do it, need to figure out better way.
-            //TODO: major issue in any alternate approach is that flow key is incrementally assigned
-            //to the flows stored in data store.
-            // Augment same statistics to all the matching masked flow
-            if(table != null){
-
-                for(Flow existingFlow : table.getFlow()){
-                    logger.debug("Existing flow in data store : {}",existingFlow.toString());
-                    if(FlowComparator.flowEquals(flowRule,existingFlow)){
-                        InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
-                                .augmentation(FlowCapableNode.class)
-                                .child(Table.class, new TableKey(tableId))
-                                .child(Flow.class,existingFlow.getKey()).toInstance();
-                        flowBuilder.setKey(existingFlow.getKey());
-                        flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
-                        logger.debug("Found matching flow in the datastore, augmenting statistics");
-                        foundOriginalFlow = true;
-                        // Update entry with timestamp of latest response
-                        flow.setKey(existingFlow.getKey());
-                        FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
-                        flowStatsUpdate.put(flowStatsEntry, expiryTime);
-
-                        trans.putOperationalData(flowRef, flowBuilder.build());
-                    }
-                }
+        trans.commit();
+    }
+
+    public synchronized void requestPeriodicStatistics() {
+        logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
+
+        try{
+            if(flowTableStatsService != null){
+                final GetFlowTablesStatisticsInputBuilder input = new GetFlowTablesStatisticsInputBuilder();
+                input.setNode(targetNodeRef);
+
+                Future<RpcResult<GetFlowTablesStatisticsOutput>> response = flowTableStatsService.getFlowTablesStatistics(input.build());
+                recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW_TABLE);
             }
+            if(flowStatsService != null){
+                // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
+                //        comes back -- we do not have any tables anyway.
+                sendAggregateFlowsStatsFromAllTablesRequest();
 
-            table = (Table)trans.readOperationalData(tableRef);
-            if(!foundOriginalFlow && table != null){
-
-                for(Flow existingFlow : table.getFlow()){
-                    FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
-                    if(augmentedflowStatisticsData != null){
-                        FlowBuilder existingOperationalFlow = new FlowBuilder();
-                        existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
-                        logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
-                        if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
-                            InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
-                                    .augmentation(FlowCapableNode.class)
-                                    .child(Table.class, new TableKey(tableId))
-                                    .child(Flow.class,existingFlow.getKey()).toInstance();
-                            flowBuilder.setKey(existingFlow.getKey());
-                            flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
-                            logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
-                            foundOriginalFlow = true;
-
-                            // Update entry with timestamp of latest response
-                            flow.setKey(existingFlow.getKey());
-                            FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
-                            flowStatsUpdate.put(flowStatsEntry, expiryTime);
-                            trans.putOperationalData(flowRef, flowBuilder.build());
-                            break;
-                        }
-                    }
-                }
+                sendAllFlowsStatsFromAllTablesRequest();
+            }
+            if(portStatsService != null){
+                sendAllNodeConnectorsStatisticsRequest();
+            }
+            if(groupStatsService != null){
+                sendAllGroupStatisticsRequest();
+                sendGroupDescriptionRequest();
             }
-            if(!foundOriginalFlow){
-                String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
-                this.unaccountedFlowsCounter++;
-                FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
-                InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
-                        .augmentation(FlowCapableNode.class)
-                        .child(Table.class, new TableKey(tableId))
-                        .child(Flow.class,newFlowKey).toInstance();
-                flowBuilder.setKey(newFlowKey);
-                flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
-                logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",
-                        flowBuilder.build());
-
-                // Update entry with timestamp of latest response
-                flow.setKey(newFlowKey);
-                FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build());
-                flowStatsUpdate.put(flowStatsEntry, expiryTime);
-                trans.putOperationalData(flowRef, flowBuilder.build());
+            if(meterStatsService != null){
+                sendAllMeterStatisticsRequest();
+                sendMeterConfigStatisticsRequest();
             }
+            if(queueStatsService != null){
+                sendAllQueueStatsFromAllNodeConnector();
+            }
+        } catch(Exception e) {
+            logger.error("Exception occured while sending statistics requests", e);
         }
+    }
 
-        trans.commit();
+    public synchronized void start() {
+        requestPeriodicStatistics();
     }
 
-    private static Long getExpiryTime(){
-        final long now = System.nanoTime();
-        return now + TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_THREAD_EXECUTION_TIME * NUMBER_OF_WAIT_CYCLES);
+    @Override
+    public synchronized void close() {
+        // FIXME: cleanup any resources we hold (registrations, etc.)
+        logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
     }
 
-    public synchronized void cleanStaleStatistics(){
-        final DataModificationTransaction trans = this.statisticsProvider.startChange();
-        final long now = System.nanoTime();
+    synchronized void sendFlowStatsFromTableRequest(Flow flow) throws InterruptedException, ExecutionException{
+        final GetFlowStatisticsFromFlowTableInputBuilder input =
+                new GetFlowStatisticsFromFlowTableInputBuilder(flow);
 
-        //Clean stale statistics related to group
-        for (Iterator<Entry<GroupDescStats, Long>> it = this.groupDescStatsUpdate.entrySet().iterator();it.hasNext();){
-            Entry<GroupDescStats, Long> e = it.next();
-            if (now > e.getValue()) {
-                cleanGroupStatsFromDataStore(trans, e.getKey());
-                it.remove();
-            }
-        }
+        input.setNode(targetNodeRef);
 
-        //Clean stale statistics related to meter
-        for (Iterator<Entry<MeterConfigStats, Long>> it = this.meterConfigStatsUpdate.entrySet().iterator();it.hasNext();){
-            Entry<MeterConfigStats, Long> e = it.next();
-            if (now > e.getValue()) {
-                cleanMeterStatsFromDataStore(trans, e.getKey());
-                it.remove();
-            }
-        }
+        Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
+                flowStatsService.getFlowStatisticsFromFlowTable(input.build());
 
-        //Clean stale statistics related to flow
-        for (Iterator<Entry<FlowEntry, Long>> it = this.flowStatsUpdate.entrySet().iterator();it.hasNext();){
-            Entry<FlowEntry, Long> e = it.next();
-            if (now > e.getValue()) {
-                cleanFlowStatsFromDataStore(trans, e.getKey());
-                it.remove();
-            }
-        }
+        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
+    }
 
-        //Clean stale statistics related to queue
-        for (Iterator<Entry<QueueEntry, Long>> it = this.queuesStatsUpdate.entrySet().iterator();it.hasNext();){
-            Entry<QueueEntry, Long> e = it.next();
-            if (now > e.getValue()) {
-                cleanQueueStatsFromDataStore(trans, e.getKey());
-                it.remove();
-            }
-        }
+    synchronized void sendGroupDescriptionRequest() throws InterruptedException, ExecutionException{
+        final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
 
-        trans.commit();
+        input.setNode(targetNodeRef);
+
+        Future<RpcResult<GetGroupDescriptionOutput>> response =
+                groupStatsService.getGroupDescription(input.build());
+
+        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC);
     }
 
-    private void cleanQueueStatsFromDataStore(DataModificationTransaction trans, QueueEntry queueEntry) {
-        InstanceIdentifier<?> queueRef
-                        = InstanceIdentifier.builder(Nodes.class)
-                                            .child(Node.class, this.targetNodeKey)
-                                            .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
-                                            .augmentation(FlowCapableNodeConnector.class)
-                                            .child(Queue.class, new QueueKey(queueEntry.getQueueId()))
-                                            .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
-        trans.removeOperationalData(queueRef);
+    synchronized void sendMeterConfigStatisticsRequest() throws InterruptedException, ExecutionException{
+
+        GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
+
+        input.setNode(targetNodeRef);
+
+        Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
+                meterStatsService.getAllMeterConfigStatistics(input.build());
+
+        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);
     }
 
-    private void cleanFlowStatsFromDataStore(DataModificationTransaction trans, FlowEntry flowEntry) {
-        InstanceIdentifier<?> flowRef
-                        = InstanceIdentifier.builder(Nodes.class).child(Node.class, this.targetNodeKey)
-                                            .augmentation(FlowCapableNode.class)
-                                            .child(Table.class, new TableKey(flowEntry.getTableId()))
-                                            .child(Flow.class,flowEntry.getFlow().getKey())
-                                            .augmentation(FlowStatisticsData.class).toInstance();
-        trans.removeOperationalData(flowRef);
+    synchronized void sendQueueStatsFromGivenNodeConnector(NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
+        GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
+
+        input.setNode(targetNodeRef);
+        input.setNodeConnectorId(nodeConnectorId);
+        input.setQueueId(queueId);
+        Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
+                queueStatsService.getQueueStatisticsFromGivenPort(input.build());
+
+        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
     }
 
-    private void cleanMeterStatsFromDataStore(DataModificationTransaction trans, MeterConfigStats meterConfigStats) {
-        InstanceIdentifierBuilder<Meter> meterRef
-                        = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
-                                            .augmentation(FlowCapableNode.class)
-                                            .child(Meter.class,new MeterKey(meterConfigStats.getMeterId()));
+    private void sendAllMeterStatisticsRequest() throws InterruptedException, ExecutionException{
 
-        InstanceIdentifier<?> nodeMeterConfigStatsAugmentation = meterRef.augmentation(NodeMeterConfigStats.class).toInstance();
-        trans.removeOperationalData(nodeMeterConfigStatsAugmentation);
+        GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
 
-        InstanceIdentifier<?> nodeMeterStatisticsAugmentation = meterRef.augmentation(NodeMeterStatistics.class).toInstance();
-        trans.removeOperationalData(nodeMeterStatisticsAugmentation);
+        input.setNode(targetNodeRef);
+
+        Future<RpcResult<GetAllMeterStatisticsOutput>> response =
+                meterStatsService.getAllMeterStatistics(input.build());
+
+        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_METER);
     }
 
-    private void cleanGroupStatsFromDataStore(DataModificationTransaction trans, GroupDescStats groupDescStats) {
-        InstanceIdentifierBuilder<Group> groupRef
-                        = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
-                                            .augmentation(FlowCapableNode.class)
-                                            .child(Group.class,new GroupKey(groupDescStats.getGroupId()));
+    private void sendAllFlowsStatsFromAllTablesRequest() throws InterruptedException, ExecutionException{
+        final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
+        input.setNode(targetNodeRef);
+
+        Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
+
+        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
+    }
+
+    private void sendAggregateFlowsStatsFromAllTablesRequest() throws InterruptedException, ExecutionException{
+        final Collection<TableKey> tables = getKnownTables();
+        logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
+
+        for (TableKey key : tables) {
+            sendAggregateFlowsStatsFromTableRequest(key.getId().shortValue());
+        }
+    }
 
-        InstanceIdentifier<?> nodeGroupDescStatsAugmentation = groupRef.augmentation(NodeGroupDescStats.class).toInstance();
-        trans.removeOperationalData(nodeGroupDescStatsAugmentation);
+    private void sendAggregateFlowsStatsFromTableRequest(Short tableId) throws InterruptedException, ExecutionException{
+        logger.debug("Send aggregate stats request for flow table {} to node {}",tableId, targetNodeKey);
+        GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
+                new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+
+        input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
+        input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
+        Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
+                flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
+
+        recordExpectedTableTransaction(response.get().getResult().getTransactionId(), tableId);
+    }
+
+    private void sendAllQueueStatsFromAllNodeConnector() throws InterruptedException, ExecutionException {
+        GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
+
+        input.setNode(targetNodeRef);
+
+        Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
+                queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
+
+        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);
+    }
+
+    private void sendAllNodeConnectorsStatisticsRequest() throws InterruptedException, ExecutionException{
+        final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
+
+        input.setNode(targetNodeRef);
+
+        Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
+                portStatsService.getAllNodeConnectorsStatistics(input.build());
+        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_PORT);
+    }
+
+    private void sendAllGroupStatisticsRequest() throws InterruptedException, ExecutionException{
+        final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
+        input.setNode(targetNodeRef);
+
+        Future<RpcResult<GetAllGroupStatisticsOutput>> response =
+                groupStatsService.getAllGroupStatistics(input.build());
+
+        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_GROUP);
+    }
+
+    private void recordExpectedTransaction(TransactionId transactionId, StatsRequestType reqType) {
+        msgManager.recordExpectedTransaction(transactionId, reqType);
+    }
 
-        InstanceIdentifier<?> nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance();
-        trans.removeOperationalData(nodeGroupStatisticsAugmentation);
+    private void recordExpectedTableTransaction(TransactionId transactionId, Short tableId) {
+        msgManager.recordExpectedTableTransaction(transactionId, StatsRequestType.AGGR_FLOW, tableId);
     }
 }
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/QueueStatsEntry.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/QueueStatsEntry.java
new file mode 100644 (file)
index 0000000..d1f2529
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+
+final class QueueStatsEntry {
+    private final NodeConnectorId nodeConnectorId;
+    private final QueueId queueId;
+    public QueueStatsEntry(NodeConnectorId ncId, QueueId queueId){
+        this.nodeConnectorId = ncId;
+        this.queueId = queueId;
+    }
+    public NodeConnectorId getNodeConnectorId() {
+        return nodeConnectorId;
+    }
+    public QueueId getQueueId() {
+        return queueId;
+    }
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode());
+        result = prime * result + ((queueId == null) ? 0 : queueId.hashCode());
+        return result;
+    }
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof QueueStatsEntry)) {
+            return false;
+        }
+        QueueStatsEntry other = (QueueStatsEntry) obj;
+        if (nodeConnectorId == null) {
+            if (other.nodeConnectorId != null) {
+                return false;
+            }
+        } else if (!nodeConnectorId.equals(other.nodeConnectorId)) {
+            return false;
+        }
+        if (queueId == null) {
+            if (other.queueId != null) {
+                return false;
+            }
+        } else if (!queueId.equals(other.queueId)) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/QueueStatsTracker.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/QueueStatsTracker.java
new file mode 100644 (file)
index 0000000..c2bde6a
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright IBM Corporation, 2013.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.flow.capable.node.connector.queue.statistics.FlowCapableNodeConnectorQueueStatisticsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class QueueStatsTracker extends AbstractStatsTracker<QueueIdAndStatisticsMap, QueueStatsEntry> {
+    private static final Logger logger = LoggerFactory.getLogger(QueueStatsTracker.class);
+
+    QueueStatsTracker(InstanceIdentifier<Node> nodeIdentifier,
+            DataProviderService dps, long lifetimeNanos) {
+        super(nodeIdentifier, dps, lifetimeNanos);
+    }
+
+    @Override
+    protected void cleanupSingleStat(DataModificationTransaction trans, QueueStatsEntry item) {
+        InstanceIdentifier<?> queueRef
+                            = getNodeIdentifierBuilder().child(NodeConnector.class, new NodeConnectorKey(item.getNodeConnectorId()))
+                                                .augmentation(FlowCapableNodeConnector.class)
+                                                .child(Queue.class, new QueueKey(item.getQueueId()))
+                                                .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).build();
+        trans.removeOperationalData(queueRef);
+    }
+
+    @Override
+    protected QueueStatsEntry updateSingleStat(DataModificationTransaction trans, QueueIdAndStatisticsMap item) {
+
+        QueueStatsEntry queueEntry = new QueueStatsEntry(item.getNodeConnectorId(), item.getQueueId());
+
+        FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
+
+        FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
+
+        queueStatisticsBuilder.fieldsFrom(item);
+
+        queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
+
+        InstanceIdentifier<Queue> queueRef = getNodeIdentifierBuilder().child(NodeConnector.class, new NodeConnectorKey(item.getNodeConnectorId()))
+                                    .augmentation(FlowCapableNodeConnector.class)
+                                    .child(Queue.class, new QueueKey(item.getQueueId())).toInstance();
+
+        QueueBuilder queueBuilder = new QueueBuilder();
+        FlowCapableNodeConnectorQueueStatisticsData qsd = queueStatisticsDataBuilder.build();
+        queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, qsd);
+        queueBuilder.setKey(new QueueKey(item.getQueueId()));
+
+        logger.debug("Augmenting queue statistics {} of queue {} to port {}",
+                                    qsd,
+                                    item.getQueueId(),
+                                    item.getNodeConnectorId());
+
+        trans.putOperationalData(queueRef, queueBuilder.build());
+        return queueEntry;
+    }
+}
index 155815d..bd9f96c 100644 (file)
@@ -44,7 +44,6 @@ public class StatisticsListener implements OpendaylightGroupStatisticsListener,
 
     private final static Logger sucLogger = LoggerFactory.getLogger(StatisticsListener.class);
     private final StatisticsProvider statisticsManager;
-    private final MultipartMessageManager messageManager;
 
     /**
      * default ctor
@@ -52,56 +51,37 @@ public class StatisticsListener implements OpendaylightGroupStatisticsListener,
      */
     public StatisticsListener(final StatisticsProvider manager){
         this.statisticsManager = manager;
-        this.messageManager = this.statisticsManager.getMultipartMessageManager();
     }
 
     @Override
     public void onMeterConfigStatsUpdated(final MeterConfigStatsUpdated notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
-        //Add statistics to local cache
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateMeterConfigStats(notification.getMeterConfigStats());
+            handler.updateMeterConfigStats(notification, notification.isMoreReplies(), notification.getMeterConfigStats());
         }
     }
 
     @Override
     public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
-        //Add statistics to local cache
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateMeterStats(notification.getMeterStats());
+            handler.updateMeterStats(notification, notification.isMoreReplies(), notification.getMeterStats());
         }
     }
 
     @Override
     public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateGroupDescStats(notification.getGroupDescStats());
+            handler.updateGroupDescStats(notification, notification.isMoreReplies(), notification.getGroupDescStats());
         }
     }
 
     @Override
     public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateGroupStats(notification.getGroupStats());
+            handler.updateGroupStats(notification, notification.isMoreReplies(), notification.getGroupStats());
         }
     }
 
@@ -123,65 +103,42 @@ public class StatisticsListener implements OpendaylightGroupStatisticsListener,
 
     @Override
     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         sucLogger.debug("Received flow stats update : {}",notification.toString());
         final NodeStatisticsHandler sna = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (sna != null) {
-            sna.updateFlowStats(notification.getFlowAndStatisticsMapList());
+            sna.updateFlowStats(notification, notification.isMoreReplies(), notification.getFlowAndStatisticsMapList());
         }
     }
 
     @Override
     public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            final Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId());
-            handler.updateAggregateFlowStats(tableId, notification);
+            handler.updateAggregateFlowStats(notification, notification.isMoreReplies(), notification);
         }
     }
 
     @Override
     public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateNodeConnectorStats(notification.getNodeConnectorStatisticsAndPortNumberMap());
+            handler.updateNodeConnectorStats(notification, notification.isMoreReplies(), notification.getNodeConnectorStatisticsAndPortNumberMap());
         }
     }
 
     @Override
     public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateFlowTableStats(notification.getFlowTableAndStatisticsMap());
+            handler.updateFlowTableStats(notification, notification.isMoreReplies(), notification.getFlowTableAndStatisticsMap());
         }
     }
 
     @Override
     public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
-        //Add statistics to local cache
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateQueueStats(notification.getQueueIdAndStatisticsMap());
+            handler.updateQueueStats(notification, notification.isMoreReplies(), notification.getQueueIdAndStatisticsMap());
         }
     }
 }
-
index ab5d20a..9ab1e9c 100644 (file)
@@ -7,18 +7,18 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-import org.eclipse.xtext.xbase.lib.Exceptions;
-import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -26,49 +26,26 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.Fl
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,17 +64,14 @@ import com.google.common.base.Preconditions;
  *
  */
 public class StatisticsProvider implements AutoCloseable {
-    public static final int STATS_THREAD_EXECUTION_TIME= 15000;
+    public static final long STATS_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(15);
 
     private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
 
-    private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
-    private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
+    private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
+    private final Timer timer = new Timer("statistics-manager", true);
     private final DataProviderService dps;
 
-    //Local caching of stats
-    private final ConcurrentMap<NodeId,NodeStatisticsHandler> statisticsCache = new ConcurrentHashMap<>();
-
     private OpendaylightGroupStatisticsService groupStatsService;
 
     private OpendaylightMeterStatisticsService meterStatsService;
@@ -112,31 +86,19 @@ public class StatisticsProvider implements AutoCloseable {
 
     private StatisticsUpdateHandler statsUpdateHandler;
 
-    private Thread statisticsRequesterThread;
-
-    private Thread statisticsAgerThread;
-
-
     public StatisticsProvider(final DataProviderService dataService) {
         this.dps = Preconditions.checkNotNull(dataService);
     }
 
-    public MultipartMessageManager getMultipartMessageManager() {
-        return multipartMessageManager;
-    }
-
     private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
 
     private Registration<NotificationListener> listenerRegistration;
 
-    public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
-
-        this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
+    private ListenerRegistration<DataChangeListener> flowCapableTrackerRegistration;
 
-        statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
-        registerDataStoreUpdateListener(dbs);
+    public void start(final DataBrokerService dbs, final NotificationProviderService nps, final RpcConsumerRegistry rpcRegistry) {
 
-        // Get Group/Meter statistics service instance
+        // Get Group/Meter statistics service instances
         groupStatsService = rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class);
         meterStatsService = rpcRegistry.getRpcService(OpendaylightMeterStatisticsService.class);
         flowStatsService = rpcRegistry.getRpcService(OpendaylightFlowStatisticsService.class);
@@ -144,57 +106,45 @@ public class StatisticsProvider implements AutoCloseable {
         flowTableStatsService = rpcRegistry.getRpcService(OpendaylightFlowTableStatisticsService.class);
         queueStatsService = rpcRegistry.getRpcService(OpendaylightQueueStatisticsService.class);
 
-        statisticsRequesterThread = new Thread( new Runnable(){
-
-            @Override
-            public void run() {
-                while(true){
-                    try {
-                        statsRequestSender();
-
-                        Thread.sleep(STATS_THREAD_EXECUTION_TIME);
-                    }catch (Exception e){
-                        spLogger.error("Exception occurred while sending stats request : {}",e);
-                    }
-                }
-            }
-        });
-
-        spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
+        // Start receiving notifications
+        this.listenerRegistration = nps.registerNotificationListener(this.updateCommiter);
 
-        statisticsRequesterThread.start();
+        // Register for switch connect/disconnect notifications
+        final InstanceIdentifier<FlowCapableNode> fcnId = InstanceIdentifier.builder(Nodes.class)
+                .child(Node.class).augmentation(FlowCapableNode.class).build();
+        spLogger.debug("Registering FlowCapable tracker to {}", fcnId);
+        this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId,
+                new FlowCapableTracker(this, fcnId));
 
-        statisticsAgerThread = new Thread( new Runnable(){
+        statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
+        registerDataStoreUpdateListener(dbs);
 
+        timer.schedule(new TimerTask() {
             @Override
             public void run() {
-                while(true){
-                    try {
-                        for(NodeStatisticsHandler nodeStatisticsAger : statisticsCache.values()){
-                            nodeStatisticsAger.cleanStaleStatistics();
-                        }
-                        multipartMessageManager.cleanStaleTransactionIds();
-
-                        Thread.sleep(STATS_THREAD_EXECUTION_TIME);
-                    }catch (Exception e){
-                        spLogger.error("Exception occurred while sending stats request : {}",e);
+                try {
+                    // Send stats requests
+                    for (NodeStatisticsHandler h : handlers.values()) {
+                        h.requestPeriodicStatistics();
                     }
-                }
-            }
-        });
 
-        spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
+                    // Perform cleanup
+                    for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){
+                        nodeStatisticsAger.cleanStaleStatistics();
+                    }
 
-        statisticsAgerThread.start();
+                } catch (RuntimeException e) {
+                    spLogger.warn("Failed to request statistics", e);
+                }
+            }
+        }, 0, STATS_COLLECTION_MILLIS);
 
+        spLogger.debug("Statistics timer task with timer interval : {}ms", STATS_COLLECTION_MILLIS);
         spLogger.info("Statistics Provider started.");
     }
 
     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
-        //Register for Node updates
-        InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
-                                                                        .child(Node.class).toInstance();
-        dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
+        // FIXME: the below should be broken out into StatisticsUpdateHandler
 
         //Register for flow updates
         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
@@ -228,224 +178,32 @@ public class StatisticsProvider implements AutoCloseable {
         return dps.beginTransaction();
     }
 
-    private void statsRequestSender(){
-
-        List<Node> targetNodes = getAllConnectedNodes();
-
-        if(targetNodes == null)
-            return;
-
-
-        for (Node targetNode : targetNodes){
-
-            if(targetNode.getAugmentation(FlowCapableNode.class) != null){
-                sendStatisticsRequestsToNode(targetNode);
-            }
+    public void sendFlowStatsFromTableRequest(NodeKey node, Flow flow) throws InterruptedException, ExecutionException {
+        final NodeStatisticsHandler h = getStatisticsHandler(node.getId());
+        if (h != null) {
+            h.sendFlowStatsFromTableRequest(flow);
         }
     }
 
-    public void sendStatisticsRequestsToNode(Node targetNode){
-
-        spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
-
-        InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-
-        NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-
-        try{
-            if(flowStatsService != null){
-                sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
-                sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
-            }
-            if(flowTableStatsService != null){
-                sendAllFlowTablesStatisticsRequest(targetNodeRef);
-            }
-            if(portStatsService != null){
-                sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
-            }
-            if(groupStatsService != null){
-                sendAllGroupStatisticsRequest(targetNodeRef);
-                sendGroupDescriptionRequest(targetNodeRef);
-            }
-            if(meterStatsService != null){
-                sendAllMeterStatisticsRequest(targetNodeRef);
-                sendMeterConfigStatisticsRequest(targetNodeRef);
-            }
-            if(queueStatsService != null){
-                sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
-            }
-        }catch(Exception e){
-            spLogger.error("Exception occured while sending statistics requests : {}", e);
+    public void sendGroupDescriptionRequest(NodeKey node) throws InterruptedException, ExecutionException{
+        final NodeStatisticsHandler h = getStatisticsHandler(node.getId());
+        if (h != null) {
+            h.sendGroupDescriptionRequest();
         }
     }
 
-
-    public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
-        final GetFlowTablesStatisticsInputBuilder input =
-                new GetFlowTablesStatisticsInputBuilder();
-
-        input.setNode(targetNodeRef);
-
-        Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
-                flowTableStatsService.getFlowTablesStatistics(input.build());
-
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_FLOW_TABLE);
-
-    }
-
-    public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-        final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
-                new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
-
-        input.setNode(targetNode);
-
-        Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
-                flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
-
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_FLOW);
-
-    }
-
-    public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
-        final GetFlowStatisticsFromFlowTableInputBuilder input =
-                new GetFlowStatisticsFromFlowTableInputBuilder();
-
-        input.setNode(targetNode);
-        input.fieldsFrom(flow);
-
-        Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
-                flowStatsService.getFlowStatisticsFromFlowTable(input.build());
-
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_FLOW);
-
-    }
-
-    public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
-
-        List<Short> tablesId = getTablesFromNode(targetNodeKey);
-
-        if(tablesId.size() != 0){
-            for(Short id : tablesId){
-
-                sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
-            }
-        }else{
-            spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
+    public void sendMeterConfigStatisticsRequest(NodeKey node) throws InterruptedException, ExecutionException {
+        final NodeStatisticsHandler h = getStatisticsHandler(node.getId());
+        if (h != null) {
+            h.sendMeterConfigStatisticsRequest();
         }
     }
 
-    public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
-
-        spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
-        GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
-                new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-
-        input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
-        input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
-        Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
-                flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
-
-        multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.AGGR_FLOW);
-    }
-
-    public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
-        final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
-
-        input.setNode(targetNode);
-
-        Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
-                portStatsService.getAllNodeConnectorsStatistics(input.build());
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_PORT);
-
-    }
-
-    public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
-        final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
-
-        input.setNode(targetNode);
-
-        Future<RpcResult<GetAllGroupStatisticsOutput>> response =
-                groupStatsService.getAllGroupStatistics(input.build());
-
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_GROUP);
-
-    }
-
-    public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-        final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
-
-        input.setNode(targetNode);
-
-        Future<RpcResult<GetGroupDescriptionOutput>> response =
-                groupStatsService.getGroupDescription(input.build());
-
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
-                , StatsRequestType.GROUP_DESC);
-
-    }
-
-    public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
-        GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
-
-        input.setNode(targetNode);
-
-        Future<RpcResult<GetAllMeterStatisticsOutput>> response =
-                meterStatsService.getAllMeterStatistics(input.build());
-
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_METER);;
-
-    }
-
-    public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
-        GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
-
-        input.setNode(targetNode);
-
-        Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
-                meterStatsService.getAllMeterConfigStatistics(input.build());
-
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
-                , StatsRequestType.METER_CONFIG);;
-
-    }
-
-    public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
-        GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
-
-        input.setNode(targetNode);
-
-        Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
-                queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
-
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_QUEUE_STATS);;
-
-    }
-
-    public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
-        GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
-
-        input.setNode(targetNode);
-        input.setNodeConnectorId(nodeConnectorId);
-        input.setQueueId(queueId);
-        Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
-                queueStatsService.getQueueStatisticsFromGivenPort(input.build());
-
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_QUEUE_STATS);;
-
+    public void sendQueueStatsFromGivenNodeConnector(NodeKey node,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
+        final NodeStatisticsHandler h = getStatisticsHandler(node.getId());
+        if (h != null) {
+            h.sendQueueStatsFromGivenNodeConnector(nodeConnectorId, queueId);
+        }
     }
 
     /**
@@ -457,63 +215,61 @@ public class StatisticsProvider implements AutoCloseable {
      */
     public final NodeStatisticsHandler getStatisticsHandler(final NodeId nodeId) {
         Preconditions.checkNotNull(nodeId);
-        NodeStatisticsHandler ager = statisticsCache.get(nodeId);
-        if (ager == null) {
-            ager = new NodeStatisticsHandler(this, new NodeKey(nodeId));
-            statisticsCache.put(nodeId, ager);
+        NodeStatisticsHandler handler = handlers.get(nodeId);
+        if (handler == null) {
+            spLogger.info("Attempted to get non-existing handler for {}", nodeId);
         }
-
-        return ager;
-    }
-
-    private List<Node> getAllConnectedNodes(){
-        Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
-        if(nodes == null)
-            return null;
-
-        spLogger.debug("Number of connected nodes : {}",nodes.getNode().size());
-        return nodes.getNode();
+        return handler;
     }
 
-    private List<Short> getTablesFromNode(NodeKey nodeKey){
-        InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
-
-        FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
-        List<Short> tablesId = new ArrayList<Short>();
-        if(node != null && node.getTable()!=null){
-            spLogger.debug("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
-            for(Table table: node.getTable()){
-                tablesId.add(table.getId());
-            }
-        }
-        return tablesId;
-    }
-
-    @SuppressWarnings("unchecked")
-    private NodeId getNodeId(NodeRef nodeRef){
-        InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
-        NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
-        return nodeKey.getId();
-    }
-
-    @SuppressWarnings("deprecation")
     @Override
-    public void close(){
-
+    public void close() {
         try {
-            spLogger.info("Statistics Provider stopped.");
             if (this.listenerRegistration != null) {
-
                 this.listenerRegistration.close();
+                this.listenerRegistration = null;
+            }
+            if (this.flowCapableTrackerRegistration != null) {
+                this.flowCapableTrackerRegistration.close();
+                this.flowCapableTrackerRegistration = null;
+            }
+            timer.cancel();
+        } catch (Exception e) {
+            spLogger.warn("Failed to stop Statistics Provider completely", e);
+        } finally {
+            spLogger.info("Statistics Provider stopped.");
+        }
+    }
 
-                this.statisticsRequesterThread.destroy();
-
-                this.statisticsAgerThread.destroy();
+    void startNodeHandlers(final Collection<NodeKey> addedNodes) {
+        for (NodeKey key : addedNodes) {
+            if (handlers.containsKey(key.getId())) {
+                spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId());
+                continue;
+            }
 
+            final NodeStatisticsHandler h = new NodeStatisticsHandler(dps, key,
+                    flowStatsService, flowTableStatsService, groupStatsService,
+                    meterStatsService, portStatsService, queueStatsService);
+            final NodeStatisticsHandler old = handlers.putIfAbsent(key.getId(), h);
+            if (old == null) {
+                spLogger.debug("Started node handler for {}", key.getId());
+                h.start();
+            } else {
+                spLogger.debug("Prevented race on handler for {}", key.getId());
             }
-          } catch (Throwable e) {
-            throw Exceptions.sneakyThrow(e);
-          }
+        }
     }
 
+    void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
+        for (NodeKey key : removedNodes) {
+            final NodeStatisticsHandler s = handlers.remove(key.getId());
+            if (s != null) {
+                spLogger.debug("Stopping node handler for {}", key.getId());
+                s.close();
+            } else {
+                spLogger.warn("Attempted to remove non-existing handler for {}, very strange", key.getId());
+            }
+        }
+    }
 }
index acf182a..0459bc8 100644 (file)
@@ -14,7 +14,6 @@ import java.util.concurrent.ExecutionException;
 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
@@ -22,10 +21,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.q
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
@@ -57,42 +56,28 @@ public class StatisticsUpdateHandler implements DataChangeListener {
     @SuppressWarnings("unchecked")
     @Override
     public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
-
-        Map<InstanceIdentifier<?>, DataObject> nodeAdditions = change.getCreatedOperationalData();
-        for (InstanceIdentifier<? extends DataObject> dataObjectInstance : nodeAdditions.keySet()) {
-            DataObject dataObject = nodeAdditions.get(dataObjectInstance);
-            if(dataObject instanceof Node){
-
-                Node node = (Node) dataObject;
-                if(node.getAugmentation(FlowCapableNode.class) != null){
-                    this.statisticsManager.sendStatisticsRequestsToNode(node);
-                }
-            }
-        }
-
         Map<InstanceIdentifier<?>, DataObject> additions = change.getCreatedConfigurationData();
         for (InstanceIdentifier<? extends DataObject> dataObjectInstance : additions.keySet()) {
             DataObject dataObject = additions.get(dataObjectInstance);
-            InstanceIdentifier<Node> nodeII = dataObjectInstance.firstIdentifierOf(Node.class);
-            NodeRef nodeRef = new NodeRef(nodeII);
+            NodeKey nodeII = dataObjectInstance.firstKeyOf(Node.class, NodeKey.class);
             if(dataObject instanceof Flow){
                 Flow flow = (Flow) dataObject;
                 try {
-                    this.statisticsManager.sendFlowStatsFromTableRequest(nodeRef, flow);
+                    this.statisticsManager.sendFlowStatsFromTableRequest(nodeII, flow);
                 } catch (InterruptedException | ExecutionException e) {
                     suhLogger.warn("Following exception occured while sending flow statistics request newly added flow: {}", e);
                 }
             }
             if(dataObject instanceof Meter){
                 try {
-                    this.statisticsManager.sendMeterConfigStatisticsRequest(nodeRef);
+                    this.statisticsManager.sendMeterConfigStatisticsRequest(nodeII);
                 } catch (InterruptedException | ExecutionException e) {
                     suhLogger.warn("Following exception occured while sending meter statistics request for newly added meter: {}", e);
                 }
             }
             if(dataObject instanceof Group){
                 try {
-                    this.statisticsManager.sendGroupDescriptionRequest(nodeRef);
+                    this.statisticsManager.sendGroupDescriptionRequest(nodeII);
                 } catch (InterruptedException | ExecutionException e) {
                     suhLogger.warn("Following exception occured while sending group description request for newly added group: {}", e);
                 }
@@ -102,7 +87,8 @@ public class StatisticsUpdateHandler implements DataChangeListener {
                 InstanceIdentifier<NodeConnector> nodeConnectorII = dataObjectInstance.firstIdentifierOf(NodeConnector.class);
                 NodeConnectorKey nodeConnectorKey = InstanceIdentifier.keyOf(nodeConnectorII);
                 try {
-                    this.statisticsManager.sendQueueStatsFromGivenNodeConnector(nodeRef, nodeConnectorKey.getId(), queue.getQueueId());
+                    this.statisticsManager.sendQueueStatsFromGivenNodeConnector(nodeII,
+                            nodeConnectorKey.getId(), queue.getQueueId());
                 } catch (InterruptedException | ExecutionException e) {
                     suhLogger.warn("Following exception occured while sending queue statistics request for newly added group: {}", e);
                 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.