Gerrit contains following fixes:
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
index 7b7403f1c327f6492d81e0a1980510025b46e37e..325b342cd88bdb71d6d2138169f2f7ae36c83785 100644 (file)
@@ -115,7 +115,7 @@ public class StatisticsProvider implements AutoCloseable {
 
     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
     
-    public static final int STATS_THREAD_EXECUTION_TIME= 30000;
+    public static final int STATS_THREAD_EXECUTION_TIME= 15000;
     //Local caching of stats
     
     private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache = 
@@ -211,6 +211,7 @@ public class StatisticsProvider implements AutoCloseable {
                         for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
                             nodeStatisticsAger.cleanStaleStatistics();
                         }
+                        multipartMessageManager.cleanStaleTransactionIds();
                         
                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
                     }catch (Exception e){
@@ -228,6 +229,11 @@ public class StatisticsProvider implements AutoCloseable {
     }
     
     private void registerDataStoreUpdateListener(DataBrokerService dbs) {
+        //Register for Node updates
+        InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
+                                                                        .child(Node.class).toInstance();
+        dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
+
         //Register for flow updates
         InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
                                                                     .augmentation(FlowCapableNode.class)
@@ -273,37 +279,46 @@ public class StatisticsProvider implements AutoCloseable {
         for (Node targetNode : targetNodes){
             
             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
-
-                spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
-                
-                InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-                
-                NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-            
-                try{
-                    sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
-                
-                    sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
-
-                    sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
-                
-                    sendAllFlowTablesStatisticsRequest(targetNodeRef);
-                
-                    sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
-
-                    sendAllGroupStatisticsRequest(targetNodeRef);
-                    
-                    sendAllMeterStatisticsRequest(targetNodeRef);
-                    
-                    sendGroupDescriptionRequest(targetNodeRef);
-                    
-                    sendMeterConfigStatisticsRequest(targetNodeRef);
-                }catch(Exception e){
-                    spLogger.error("Exception occured while sending statistics requests : {}", e);
-                }
+                sendStatisticsRequestsToNode(targetNode);
             }
         }
     }
+    
+    public void sendStatisticsRequestsToNode(Node targetNode){
+        
+        spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
+        
+        InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
+        
+        NodeRef targetNodeRef = new NodeRef(targetInstanceId);
+    
+        try{
+            if(flowStatsService != null){
+                sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
+                sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
+            }
+            if(flowTableStatsService != null){
+                sendAllFlowTablesStatisticsRequest(targetNodeRef);
+            }
+            if(portStatsService != null){
+                sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
+            }
+            if(groupStatsService != null){
+                sendAllGroupStatisticsRequest(targetNodeRef);
+                sendGroupDescriptionRequest(targetNodeRef);
+            }
+            if(meterStatsService != null){
+                sendAllMeterStatisticsRequest(targetNodeRef);
+                sendMeterConfigStatisticsRequest(targetNodeRef);
+            }
+            if(queueStatsService != null){
+                sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
+            }
+        }catch(Exception e){
+            spLogger.error("Exception occured while sending statistics requests : {}", e);
+        }
+    }
+    
 
     public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
         final GetFlowTablesStatisticsInputBuilder input = 
@@ -314,7 +329,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetFlowTablesStatisticsOutput>> response = 
                 flowTableStatsService.getFlowTablesStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW_TABLE);
 
     }
@@ -328,7 +343,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = 
                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW);
         
     }
@@ -343,7 +358,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response = 
                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_FLOW);
         
     }
@@ -355,23 +370,28 @@ public class StatisticsProvider implements AutoCloseable {
         if(tablesId.size() != 0){
             for(Short id : tablesId){
                 
-                spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
-                GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
-                        new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-                
-                input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
-                input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id));
-                Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
-                        flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
-                
-                multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
-                this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
-                        , StatsRequestType.AGGR_FLOW);
+                sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
             }
         }else{
             spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
         }
     }
+    
+    public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
+        
+        spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
+        GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
+                new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+                
+        input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
+        input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
+        Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
+                flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
+                
+        multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
+                , StatsRequestType.AGGR_FLOW);
+    }
 
     public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
         
@@ -381,7 +401,7 @@ public class StatisticsProvider implements AutoCloseable {
 
         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response = 
                 portStatsService.getAllNodeConnectorsStatistics(input.build());
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_PORT);
 
     }
@@ -395,7 +415,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllGroupStatisticsOutput>> response = 
                 groupStatsService.getAllGroupStatistics(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_GROUP);
 
     }
@@ -408,7 +428,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetGroupDescriptionOutput>> response = 
                 groupStatsService.getGroupDescription(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.GROUP_DESC);
 
     }
@@ -422,7 +442,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllMeterStatisticsOutput>> response = 
                 meterStatsService.getAllMeterStatistics(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_METER);;
 
     }
@@ -436,7 +456,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response = 
                 meterStatsService.getAllMeterConfigStatistics(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.METER_CONFIG);;
 
     }
@@ -449,7 +469,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response = 
                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_QUEUE_STATS);;
 
     }
@@ -463,7 +483,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response = 
                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
         
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+        this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
                 , StatsRequestType.ALL_QUEUE_STATS);;
 
     }
@@ -478,7 +498,7 @@ public class StatisticsProvider implements AutoCloseable {
         if(nodes == null)
             return null;
         
-        spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
+        spLogger.debug("Number of connected nodes : {}",nodes.getNode().size());
         return nodes.getNode();
     }
     
@@ -488,7 +508,7 @@ public class StatisticsProvider implements AutoCloseable {
         FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
         List<Short> tablesId = new ArrayList<Short>();
         if(node != null && node.getTable()!=null){
-            spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
+            spLogger.debug("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
             for(Table table: node.getTable()){
                 tablesId.add(table.getId());
             }
@@ -496,6 +516,13 @@ public class StatisticsProvider implements AutoCloseable {
         return tablesId;
     }
 
+    @SuppressWarnings("unchecked")
+    private NodeId getNodeId(NodeRef nodeRef){
+        InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
+        NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
+        return nodeKey.getId();
+    }
+    
     @SuppressWarnings("deprecation")
     @Override
     public void close(){