private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
- public static final int STATS_THREAD_EXECUTION_TIME= 30000;
+ public static final int STATS_THREAD_EXECUTION_TIME= 15000;
//Local caching of stats
private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache =
for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
nodeStatisticsAger.cleanStaleStatistics();
}
+ multipartMessageManager.cleanStaleTransactionIds();
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
}
private void registerDataStoreUpdateListener(DataBrokerService dbs) {
+ //Register for Node updates
+ InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class).toInstance();
+ dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
+
//Register for flow updates
InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
for (Node targetNode : targetNodes){
if(targetNode.getAugmentation(FlowCapableNode.class) != null){
-
- spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
-
- InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-
- NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-
- try{
- sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
-
- sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
-
- sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
-
- sendAllFlowTablesStatisticsRequest(targetNodeRef);
-
- sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
-
- sendAllGroupStatisticsRequest(targetNodeRef);
-
- sendAllMeterStatisticsRequest(targetNodeRef);
-
- sendGroupDescriptionRequest(targetNodeRef);
-
- sendMeterConfigStatisticsRequest(targetNodeRef);
- }catch(Exception e){
- spLogger.error("Exception occured while sending statistics requests : {}", e);
- }
+ sendStatisticsRequestsToNode(targetNode);
}
}
}
+
+ public void sendStatisticsRequestsToNode(Node targetNode){
+
+ spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
+
+ InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
+
+ NodeRef targetNodeRef = new NodeRef(targetInstanceId);
+
+ try{
+ if(flowStatsService != null){
+ sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
+ sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
+ }
+ if(flowTableStatsService != null){
+ sendAllFlowTablesStatisticsRequest(targetNodeRef);
+ }
+ if(portStatsService != null){
+ sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
+ }
+ if(groupStatsService != null){
+ sendAllGroupStatisticsRequest(targetNodeRef);
+ sendGroupDescriptionRequest(targetNodeRef);
+ }
+ if(meterStatsService != null){
+ sendAllMeterStatisticsRequest(targetNodeRef);
+ sendMeterConfigStatisticsRequest(targetNodeRef);
+ }
+ if(queueStatsService != null){
+ sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
+ }
+ }catch(Exception e){
+ spLogger.error("Exception occured while sending statistics requests : {}", e);
+ }
+ }
+
public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
final GetFlowTablesStatisticsInputBuilder input =
Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
flowTableStatsService.getFlowTablesStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW_TABLE);
}
Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
}
Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
flowStatsService.getFlowStatisticsFromFlowTable(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
}
if(tablesId.size() != 0){
for(Short id : tablesId){
- spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
- GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
- new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-
- input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
- input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id));
- Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
- flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
-
- multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
- , StatsRequestType.AGGR_FLOW);
+ sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
}
}else{
spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
}
}
+
+ public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
+
+ spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
+ GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
+ new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+
+ input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
+ input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
+ Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
+ flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
+
+ multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
+ , StatsRequestType.AGGR_FLOW);
+ }
public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
portStatsService.getAllNodeConnectorsStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_PORT);
}
Future<RpcResult<GetAllGroupStatisticsOutput>> response =
groupStatsService.getAllGroupStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_GROUP);
}
Future<RpcResult<GetGroupDescriptionOutput>> response =
groupStatsService.getGroupDescription(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.GROUP_DESC);
}
Future<RpcResult<GetAllMeterStatisticsOutput>> response =
meterStatsService.getAllMeterStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_METER);;
}
Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
meterStatsService.getAllMeterConfigStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.METER_CONFIG);;
}
Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
queueStatsService.getQueueStatisticsFromGivenPort(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
if(nodes == null)
return null;
- spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
+ spLogger.debug("Number of connected nodes : {}",nodes.getNode().size());
return nodes.getNode();
}
FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
List<Short> tablesId = new ArrayList<Short>();
if(node != null && node.getTable()!=null){
- spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
+ spLogger.debug("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
for(Table table: node.getTable()){
tablesId.add(table.getId());
}
return tablesId;
}
+ @SuppressWarnings("unchecked")
+ private NodeId getNodeId(NodeRef nodeRef){
+ InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
+ NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
+ return nodeKey.getId();
+ }
+
@SuppressWarnings("deprecation")
@Override
public void close(){