Gerrit contains following minor enhancements: 48/4748/1
authorAnil Vishnoi <avishnoi@in.ibm.com>
Thu, 23 Jan 2014 09:13:43 +0000 (14:43 +0530)
committerAnil Vishnoi <avishnoi@in.ibm.com>
Fri, 24 Jan 2014 23:31:38 +0000 (05:01 +0530)
1) Send statistics requests whenever new flow capable node connects to the controller
2) Clean up transaction-id cache for expired Ids
3) Remove Tx id when last part of multipart response received.

Change-Id: I4055b7e7ad10a67e78bafd3b977db642fe5b1ee3
Signed-off-by: Anil Vishnoi <avishnoi@in.ibm.com>
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java

index 998d5d8faaf24fd09e10d6a5865f1a5c169e6d96..dfe356bdb1efbbc5ec7d299025b6cf879d820b38 100644 (file)
@@ -7,10 +7,13 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
+import java.util.Date;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 
 /**
  * Main responsibility of the class is to manage multipart response 
@@ -27,32 +30,119 @@ public class MultipartMessageManager {
      *  response for which it didn't send the request.  
      */
     
-    private static Map<TransactionId,StatsRequestType> txIdToRequestTypeMap = new ConcurrentHashMap<TransactionId,StatsRequestType>();
+    private static Map<TxIdEntry,Date> txIdToRequestTypeMap = new ConcurrentHashMap<TxIdEntry,Date>();
     /*
      * Map to keep track of the request tx id for flow table statistics request.
      * Because flow table statistics multi part response do not contains the table id.
      */
-    private static Map<TransactionId,Short> txIdTotableIdMap = new ConcurrentHashMap<TransactionId,Short>();
+    private static Map<TxIdEntry,Short> txIdTotableIdMap = new ConcurrentHashMap<TxIdEntry,Short>();
     
+    private final int NUMBER_OF_WAIT_CYCLES =2;
+
+    class TxIdEntry{
+        private final TransactionId txId;
+        private final NodeId nodeId;
+        private final StatsRequestType requestType;
+        
+        public TxIdEntry(NodeId nodeId, TransactionId txId, StatsRequestType requestType){
+            this.txId = txId;
+            this.nodeId = nodeId;
+            this.requestType = requestType;
+        }
+        public TransactionId getTxId() {
+            return txId;
+        }
+        public NodeId getNodeId() {
+            return nodeId;
+        }
+        public StatsRequestType getRequestType() {
+            return requestType;
+        }
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + getOuterType().hashCode();
+            result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+            result = prime * result + ((txId == null) ? 0 : txId.hashCode());
+            return result;
+        }
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (!(obj instanceof TxIdEntry)) {
+                return false;
+            }
+            TxIdEntry other = (TxIdEntry) obj;
+            if (!getOuterType().equals(other.getOuterType())) {
+                return false;
+            }
+            if (nodeId == null) {
+                if (other.nodeId != null) {
+                    return false;
+                }
+            } else if (!nodeId.equals(other.nodeId)) {
+                return false;
+            }
+            if (txId == null) {
+                if (other.txId != null) {
+                    return false;
+                }
+            } else if (!txId.equals(other.txId)) {
+                return false;
+            }
+            return true;
+        }
+        private MultipartMessageManager getOuterType() {
+            return MultipartMessageManager.this;
+        }
+        @Override
+        public String toString() {
+            return "TxIdEntry [txId=" + txId + ", nodeId=" + nodeId + ", requestType=" + requestType + "]";
+        }
+    }
+
     public MultipartMessageManager(){}
     
-    public Short getTableIdForTxId(TransactionId id){
+    public Short getTableIdForTxId(NodeId nodeId,TransactionId id){
         
-        return txIdTotableIdMap.get(id);
+        return txIdTotableIdMap.get(new TxIdEntry(nodeId,id,null));
         
     }
     
-    public void setTxIdAndTableIdMapEntry(TransactionId id,Short tableId){
-        txIdTotableIdMap.put(id, tableId);
+    public void setTxIdAndTableIdMapEntry(NodeId nodeId, TransactionId id,Short tableId){
+        
+        txIdTotableIdMap.put(new TxIdEntry(nodeId,id,null), tableId);
     }
     
-    public void addTxIdToRequestTypeEntry (TransactionId id,StatsRequestType type){
-        txIdToRequestTypeMap.put(id, type);
+    public boolean isRequestTxIdExist(NodeId nodeId, TransactionId id, Boolean moreRepliesToFollow){
+        TxIdEntry entry = new TxIdEntry(nodeId,id,null);
+        if(moreRepliesToFollow.booleanValue()){
+            return txIdToRequestTypeMap.containsKey(entry);
+        }else{
+            return txIdToRequestTypeMap.remove(entry)==null?false:true;
+        }
     }
-    public StatsRequestType removeTxId(TransactionId id){
-        return txIdToRequestTypeMap.remove(id);
+    public void addTxIdToRequestTypeEntry (NodeId nodeId, TransactionId id,StatsRequestType type){
+        TxIdEntry entry = new TxIdEntry(nodeId,id,type);
+        txIdToRequestTypeMap.put(entry, getExpiryTime());
+    }
+    public boolean removeTxId(NodeId nodeId, TransactionId id){
+        TxIdEntry entry = new TxIdEntry(nodeId,id,null);
+        return txIdToRequestTypeMap.remove(entry)==null?false:true;
     }
     
+    private Date getExpiryTime(){
+        Date expires = new Date();
+        expires.setTime(expires.getTime()+StatisticsProvider.STATS_THREAD_EXECUTION_TIME*NUMBER_OF_WAIT_CYCLES);
+        return expires;
+    }
+
     public enum StatsRequestType{
         ALL_FLOW,
         AGGR_FLOW,
@@ -64,4 +154,16 @@ public class MultipartMessageManager {
         GROUP_DESC,
         METER_CONFIG
     }
+    
+    public void cleanStaleTransactionIds(){
+        for (Iterator<TxIdEntry> it = txIdToRequestTypeMap.keySet().iterator();it.hasNext();){
+            TxIdEntry txIdEntry = it.next();
+            Date now = new Date();
+            Date expiryTime = txIdToRequestTypeMap.get(txIdEntry);
+            if(now.after(expiryTime)){
+                it.remove();
+                txIdTotableIdMap.remove(txIdEntry);
+            }            
+        }
+    }
 }
index 7b7403f1c327f6492d81e0a1980510025b46e37e..491ff4bf44cbdfa403a1869974f5682873980ffb 100644 (file)
@@ -115,7 +115,7 @@ public class StatisticsProvider implements AutoCloseable {
 
     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
     
-    public static final int STATS_THREAD_EXECUTION_TIME= 30000;
+    public static final int STATS_THREAD_EXECUTION_TIME= 15000;
     //Local caching of stats
     
     private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache = 
@@ -211,6 +211,7 @@ public class StatisticsProvider implements AutoCloseable {
                         for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
                             nodeStatisticsAger.cleanStaleStatistics();
                         }
+                        multipartMessageManager.cleanStaleTransactionIds();
                         
                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
                     }catch (Exception e){
@@ -228,6 +229,11 @@ public class StatisticsProvider implements AutoCloseable {
     }
     
     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
+        //Register for Node updates
+        InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
+                                                                        .child(Node.class).toInstance();
+        dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
+
         //Register for flow updates
         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
                                                                     .augmentation(FlowCapableNode.class)
@@ -273,37 +279,42 @@ public class StatisticsProvider implements AutoCloseable {
         for (Node targetNode : targetNodes){
             
             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
+                sendStatisticsRequestsToNode(targetNode);
+            }
+        }
+    }
+    
+    public void sendStatisticsRequestsToNode(Node targetNode){
+        
+        spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
+        
+        InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
+        
+        NodeRef targetNodeRef = new NodeRef(targetInstanceId);
+    
+        try{
+            sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
+        
+            sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
 
-                spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
-                
-                InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-                
-                NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-            
-                try{
-                    sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
-                
-                    sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
+            sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
+        
+            sendAllFlowTablesStatisticsRequest(targetNodeRef);
+        
+            sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
 
-                    sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
-                
-                    sendAllFlowTablesStatisticsRequest(targetNodeRef);
-                
-                    sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
-
-                    sendAllGroupStatisticsRequest(targetNodeRef);
-                    
-                    sendAllMeterStatisticsRequest(targetNodeRef);
-                    
-                    sendGroupDescriptionRequest(targetNodeRef);
-                    
-                    sendMeterConfigStatisticsRequest(targetNodeRef);
-                }catch(Exception e){
-                    spLogger.error("Exception occured while sending statistics requests : {}", e);
-                }
-            }
+            sendAllGroupStatisticsRequest(targetNodeRef);
+            
+            sendAllMeterStatisticsRequest(targetNodeRef);
+            
+            sendGroupDescriptionRequest(targetNodeRef);
+            
+            sendMeterConfigStatisticsRequest(targetNodeRef);
+        }catch(Exception e){
+            spLogger.error("Exception occured while sending statistics requests : {}", e);
         }
     }
+    
 
     public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
         final GetFlowTablesStatisticsInputBuilder input = 
@@ -314,7 +325,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetFlowTablesStatisticsOutput>> response = 
                 flowTableStatsService.getFlowTablesStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW_TABLE);
 
     }
@@ -328,7 +339,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = 
                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW);
         
     }
@@ -343,7 +354,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response = 
                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW);
         
     }
@@ -355,7 +366,7 @@ public class StatisticsProvider implements AutoCloseable {
         if(tablesId.size() != 0){
             for(Short id : tablesId){
                 
-                spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
+                spLogger.debug("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
                 GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
                 
@@ -364,8 +375,8 @@ public class StatisticsProvider implements AutoCloseable {
                 Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
                         flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
                 
-                multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
-                this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+                multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), id);
+                this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
                         , StatsRequestType.AGGR_FLOW);
             }
         }else{
@@ -381,7 +392,7 @@ public class StatisticsProvider implements AutoCloseable {
 
         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response = 
                 portStatsService.getAllNodeConnectorsStatistics(input.build());
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_PORT);
 
     }
@@ -395,7 +406,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllGroupStatisticsOutput>> response = 
                 groupStatsService.getAllGroupStatistics(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_GROUP);
 
     }
@@ -408,7 +419,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetGroupDescriptionOutput>> response = 
                 groupStatsService.getGroupDescription(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.GROUP_DESC);
 
     }
@@ -422,7 +433,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllMeterStatisticsOutput>> response = 
                 meterStatsService.getAllMeterStatistics(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_METER);;
 
     }
@@ -436,7 +447,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response = 
                 meterStatsService.getAllMeterConfigStatistics(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.METER_CONFIG);;
 
     }
@@ -449,7 +460,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response = 
                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_QUEUE_STATS);;
 
     }
@@ -463,7 +474,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response = 
                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_QUEUE_STATS);;
 
     }
@@ -496,6 +507,13 @@ public class StatisticsProvider implements AutoCloseable {
         return tablesId;
     }
 
+    @SuppressWarnings("unchecked")
+    private NodeId getNodeId(NodeRef nodeRef){
+        InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
+        NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
+        return nodeKey.getId();
+    }
+    
     @SuppressWarnings("deprecation")
     @Override
     public void close(){
index ace547a03c9f5d17d28764b0ffe9a53a227e1612..07dcd0f6aa74ae123fdc88e25c0115758518553d 100644 (file)
@@ -124,12 +124,14 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     public final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
 
     private final StatisticsProvider statisticsManager;
+    private final MultipartMessageManager messageManager;
     
     private int unaccountedFlowsCounter = 1;
 
     public StatisticsUpdateCommiter(final StatisticsProvider manager){
 
         this.statisticsManager = manager;
+        this.messageManager = this.statisticsManager.getMultipartMessageManager();
     }
     
     public StatisticsProvider getStatisticsManager(){
@@ -139,7 +141,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     @Override
     public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) {
         //Check if response is for the request statistics-manager sent.
-        if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
             return;
         
         NodeKey key = new NodeKey(notification.getId());
@@ -181,7 +183,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
         
         //Check if response is for the request statistics-manager sent.
-        if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
             return;
 
         NodeKey key = new NodeKey(notification.getId());
@@ -216,7 +218,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
         
         //Check if response is for the request statistics-manager sent.
-        if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
             return;
 
         NodeKey key = new NodeKey(notification.getId());
@@ -259,7 +261,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
         
         //Check if response is for the request statistics-manager sent.
-        if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
             return;
 
         //Publish data to configuration data store
@@ -350,7 +352,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
         
         //Check if response is for the request statistics-manager sent.
-        if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
             return;
 
         NodeKey key = new NodeKey(notification.getId());
@@ -454,7 +456,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
                                 .child(Flow.class,existingFlow.getKey()).toInstance();
                         flowBuilder.setKey(existingFlow.getKey());
                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
-                        sucLogger.info("Found matching flow in the datastore, augmenting statistics");
+                        sucLogger.debug("Found matching flow in the datastore, augmenting statistics");
                         foundOriginalFlow = true;
                         it.putOperationalData(flowRef, flowBuilder.build());
                         it.commit();
@@ -479,7 +481,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
                                     .child(Flow.class,existingFlow.getKey()).toInstance();
                             flowBuilder.setKey(existingFlow.getKey());
                             flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
-                            sucLogger.debug("Found matching flow in the operational datastore, augmenting statistics");
+                            sucLogger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
                             foundOriginalFlow = true;
                             it.putOperationalData(flowRef, flowBuilder.build());
                             it.commit();
@@ -489,7 +491,6 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
                 }
             }
             if(!foundOriginalFlow){
-                sucLogger.debug("Associated original flow is not found in data store. Augmenting flow in operational data store");
                 long flowKey = Long.parseLong(new String("1"+Short.toString(tableId)+"0"+Integer.toString(this.unaccountedFlowsCounter)));
                 this.unaccountedFlowsCounter++;
                 FlowKey newFlowKey = new FlowKey(new FlowId(Long.toString(flowKey)));
@@ -499,7 +500,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
                         .child(Flow.class,newFlowKey).toInstance();
                 flowBuilder.setKey(newFlowKey);
                 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
-                sucLogger.info("Flow was no present in data store, augmenting statistics as an unaccounted flow");
+                sucLogger.info("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",flowBuilder.build());
                 it.putOperationalData(flowRef, flowBuilder.build());
                 it.commit();
             }
@@ -509,13 +510,12 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     @Override
     public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
         //Check if response is for the request statistics-manager sent.
-        if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
             return;
 
         NodeKey key = new NodeKey(notification.getId());
-        sucLogger.debug("Received aggregate flow statistics update : {}",notification.toString());
         
-        Short tableId = this.statisticsManager.getMultipartMessageManager().getTableIdForTxId(notification.getTransactionId());
+        Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId());
         if(tableId != null){
             
             DataModificationTransaction it = this.statisticsManager.startChange();
@@ -544,11 +544,10 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     @Override
     public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
         //Check if response is for the request statistics-manager sent.
-        if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
             return;
 
         NodeKey key = new NodeKey(notification.getId());
-        sucLogger.debug("Received port stats update : {}",notification.toString());
         
         List<NodeConnectorStatisticsAndPortNumberMap> portsStats = notification.getNodeConnectorStatisticsAndPortNumberMap();
         for(NodeConnectorStatisticsAndPortNumberMap portStats : portsStats){
@@ -592,11 +591,10 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     @Override
     public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
         //Check if response is for the request statistics-manager sent.
-        if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
             return;
 
         NodeKey key = new NodeKey(notification.getId());
-        sucLogger.debug("Received flow table statistics update : {}",notification.toString());
         
         List<FlowTableAndStatisticsMap> flowTablesStatsList = notification.getFlowTableAndStatisticsMap();
         for (FlowTableAndStatisticsMap ftStats : flowTablesStatsList){
@@ -629,11 +627,10 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
     public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
         
         //Check if response is for the request statistics-manager sent.
-        if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
             return;
 
         NodeKey key = new NodeKey(notification.getId());
-        sucLogger.debug("Received queue stats update : {}",notification.toString());
         
         //Add statistics to local cache
         ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
@@ -670,7 +667,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
             queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, queueStatisticsDataBuilder.build());
             queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
 
-            sucLogger.info("Augmenting queue statistics {} of queue {} to port {}"
+            sucLogger.debug("Augmenting queue statistics {} of queue {} to port {}"
                                         ,queueStatisticsDataBuilder.build().toString(),
                                         swQueueStats.getQueueId(),
                                         swQueueStats.getNodeConnectorId());
@@ -686,7 +683,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList
         InstanceIdentifierBuilder<?> builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey);
         return new NodeRef(builder.toInstance());
     }
-    
+   
     public boolean flowEquals(Flow statsFlow, Flow storedFlow) {
         if (statsFlow.getClass() != storedFlow.getClass()) {
             return false;
index f04c29fdd214daabcc4c1b8e8399f5661e147565..941a8f8c2cd97e83a3a030e5d182fb835cb9c9e2 100644 (file)
@@ -14,6 +14,7 @@ import java.util.concurrent.ExecutionException;
 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
@@ -63,6 +64,18 @@ public class StatisticsUpdateHandler implements DataChangeListener {
     @Override
     public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
         
+        Map<InstanceIdentifier<?>, DataObject> nodeAdditions = change.getCreatedOperationalData();
+        for (InstanceIdentifier<? extends DataObject> dataObjectInstance : nodeAdditions.keySet()) {
+            DataObject dataObject = nodeAdditions.get(dataObjectInstance);
+            if(dataObject instanceof Node){
+                
+                Node node = (Node) dataObject;
+                if(node.getAugmentation(FlowCapableNode.class) != null){
+                    this.statisticsManager.sendStatisticsRequestsToNode(node);
+                }
+            }
+        }
+
         Map<InstanceIdentifier<?>, DataObject> additions = change.getCreatedConfigurationData();
         for (InstanceIdentifier<? extends DataObject> dataObjectInstance : additions.keySet()) {
             DataObject dataObject = additions.get(dataObjectInstance);