- * 1) Invoke statistics request thread to send periodic statistics request to all the
- * flow capable switch connected to the controller. It sends statistics request for
- * Group,Meter,Table,Flow,Queue,Aggregate stats.
- *
- * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
+ * 1) Invoke statistics request thread to send periodic statistics request to all the
+ * flow capable switch connected to the controller. It sends statistics request for
+ * Group,Meter,Table,Flow,Queue,Aggregate stats.
+ *
+ * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
* @author avishnoi@in.ibm.com
*
*/
public class StatisticsProvider implements AutoCloseable {
public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
* @author avishnoi@in.ibm.com
*
*/
public class StatisticsProvider implements AutoCloseable {
public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
private OpendaylightPortStatisticsService portStatsService;
private OpendaylightFlowTableStatisticsService flowTableStatsService;
private OpendaylightPortStatisticsService portStatsService;
private OpendaylightFlowTableStatisticsService flowTableStatsService;
private OpendaylightQueueStatisticsService queueStatsService;
private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
private OpendaylightQueueStatisticsService queueStatsService;
private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
private Thread statisticsAgerThread;
private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
private Thread statisticsAgerThread;
private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
public void setDataBrokerService(final DataBrokerService dataBrokerService) {
this.dbs = dataBrokerService;
}
public void setDataBrokerService(final DataBrokerService dataBrokerService) {
this.dbs = dataBrokerService;
}
public void setNotificationService(final NotificationProviderService notificationService) {
this.nps = notificationService;
}
public void setNotificationService(final NotificationProviderService notificationService) {
this.nps = notificationService;
}
NotificationProviderService nps = this.getNotificationService();
Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
this.listenerRegistration = registerNotificationListener;
NotificationProviderService nps = this.getNotificationService();
Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
this.listenerRegistration = registerNotificationListener;
// Get Group/Meter statistics service instance
groupStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightGroupStatisticsService.class);
// Get Group/Meter statistics service instance
groupStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightGroupStatisticsService.class);
meterStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightMeterStatisticsService.class);
meterStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightMeterStatisticsService.class);
flowStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightFlowStatisticsService.class);
flowStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightFlowStatisticsService.class);
flowTableStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightFlowTableStatisticsService.class);
flowTableStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightFlowTableStatisticsService.class);
queueStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightQueueStatisticsService.class);
queueStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightQueueStatisticsService.class);
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
spLogger.error("Exception occurred while sending stats request : {}",e);
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
spLogger.error("Exception occurred while sending stats request : {}",e);
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
spLogger.error("Exception occurred while sending stats request : {}",e);
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
spLogger.error("Exception occurred while sending stats request : {}",e);
spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
statisticsAgerThread.start();
spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
statisticsAgerThread.start();
private void registerDataStoreUpdateListener(DataBrokerService dbs) {
//Register for Node updates
InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
private void registerDataStoreUpdateListener(DataBrokerService dbs) {
//Register for Node updates
InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
.child(Table.class)
.child(Flow.class).toInstance();
dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
.child(Table.class)
.child(Flow.class).toInstance();
dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
//Register for meter updates
InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
.child(Meter.class).toInstance();
dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
//Register for meter updates
InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
.child(Meter.class).toInstance();
dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
.child(Group.class).toInstance();
InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
.child(Group.class).toInstance();
if(targetNode.getAugmentation(FlowCapableNode.class) != null){
sendStatisticsRequestsToNode(targetNode);
}
}
}
if(targetNode.getAugmentation(FlowCapableNode.class) != null){
sendStatisticsRequestsToNode(targetNode);
}
}
}
flowTableStatsService.getFlowTablesStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
flowTableStatsService.getFlowTablesStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
final GetFlowStatisticsFromFlowTableInputBuilder input =
new GetFlowStatisticsFromFlowTableInputBuilder();
public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
final GetFlowStatisticsFromFlowTableInputBuilder input =
new GetFlowStatisticsFromFlowTableInputBuilder();
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
}
}else{
spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
}
}
sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
}
}else{
spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
}
}
public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
, StatsRequestType.AGGR_FLOW);
}
public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
, StatsRequestType.AGGR_FLOW);
}
public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
portStatsService.getAllNodeConnectorsStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_PORT);
portStatsService.getAllNodeConnectorsStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_PORT);
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_GROUP);
}
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_GROUP);
}
public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
groupStatsService.getGroupDescription(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.GROUP_DESC);
}
groupStatsService.getGroupDescription(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.GROUP_DESC);
}
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_METER);;
}
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_METER);;
}
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.METER_CONFIG);;
}
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.METER_CONFIG);;
}
public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
input.setNode(targetNode);
input.setNodeConnectorId(nodeConnectorId);
input.setQueueId(queueId);
input.setNode(targetNode);
input.setNodeConnectorId(nodeConnectorId);
input.setQueueId(queueId);
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
- public ConcurrentMap<NodeId, NodeStatisticsAger> getStatisticsCache() {
- return statisticsCache;
+ public final NodeStatisticsAger getStatisticsAger(final NodeId nodeId) {
+ NodeStatisticsAger ager = statisticsCache.get(nodeId);
+ if (ager == null) {
+ ager = new NodeStatisticsAger(this, new NodeKey(nodeId));
+ statisticsCache.put(nodeId, ager);
+ }
+
+ return ager;
private List<Short> getTablesFromNode(NodeKey nodeKey){
InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
private List<Short> getTablesFromNode(NodeKey nodeKey){
InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
List<Short> tablesId = new ArrayList<Short>();
if(node != null && node.getTable()!=null){
FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
List<Short> tablesId = new ArrayList<Short>();
if(node != null && node.getTable()!=null){