import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
+/**
* Following are main responsibilities of the class:
- * 1) Invoke statistics request thread to send periodic statistics request to all the
- * flow capable switch connected to the controller. It sends statistics request for
- * Group,Meter,Table,Flow,Queue,Aggregate stats.
- *
- * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
+ * 1) Invoke statistics request thread to send periodic statistics request to all the
+ * flow capable switch connected to the controller. It sends statistics request for
+ * Group,Meter,Table,Flow,Queue,Aggregate stats.
+ *
+ * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
* operational data store.
- *
+ *
* @author avishnoi@in.ibm.com
*
*/
public class StatisticsProvider implements AutoCloseable {
public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
-
+
private DataProviderService dps;
-
+
private DataBrokerService dbs;
private NotificationProviderService nps;
-
+
private OpendaylightGroupStatisticsService groupStatsService;
-
+
private OpendaylightMeterStatisticsService meterStatsService;
-
+
private OpendaylightFlowStatisticsService flowStatsService;
-
+
private OpendaylightPortStatisticsService portStatsService;
private OpendaylightFlowTableStatisticsService flowTableStatsService;
private OpendaylightQueueStatisticsService queueStatsService;
private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
-
+
private StatisticsUpdateHandler statsUpdateHandler;
-
+
private Thread statisticsRequesterThread;
-
+
private Thread statisticsAgerThread;
private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
-
+
public static final int STATS_THREAD_EXECUTION_TIME= 15000;
//Local caching of stats
-
- private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache =
+
+ private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache =
new ConcurrentHashMap<NodeId,NodeStatisticsAger>();
-
+
public DataProviderService getDataService() {
return this.dps;
}
-
+
public void setDataService(final DataProviderService dataService) {
this.dps = dataService;
}
-
+
public DataBrokerService getDataBrokerService() {
return this.dbs;
}
-
+
public void setDataBrokerService(final DataBrokerService dataBrokerService) {
this.dbs = dataBrokerService;
}
public NotificationProviderService getNotificationService() {
return this.nps;
}
-
+
public void setNotificationService(final NotificationProviderService notificationService) {
this.nps = notificationService;
}
}
private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
-
+
private Registration<NotificationListener> listenerRegistration;
-
+
public void start() {
-
+
NotificationProviderService nps = this.getNotificationService();
Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
this.listenerRegistration = registerNotificationListener;
-
+
statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
-
+
registerDataStoreUpdateListener(this.getDataBrokerService());
-
+
// Get Group/Meter statistics service instance
groupStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightGroupStatisticsService.class);
-
+
meterStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightMeterStatisticsService.class);
-
+
flowStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightFlowStatisticsService.class);
flowTableStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightFlowTableStatisticsService.class);
-
+
queueStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightQueueStatisticsService.class);
-
+
statisticsRequesterThread = new Thread( new Runnable(){
@Override
while(true){
try {
statsRequestSender();
-
+
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
spLogger.error("Exception occurred while sending stats request : {}",e);
}
}
});
-
+
spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
-
+
statisticsRequesterThread.start();
-
+
statisticsAgerThread = new Thread( new Runnable(){
@Override
nodeStatisticsAger.cleanStaleStatistics();
}
multipartMessageManager.cleanStaleTransactionIds();
-
+
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
spLogger.error("Exception occurred while sending stats request : {}",e);
}
}
});
-
+
spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
statisticsAgerThread.start();
-
+
spLogger.info("Statistics Provider started.");
}
-
+
private void registerDataStoreUpdateListener(DataBrokerService dbs) {
//Register for Node updates
InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
.child(Table.class)
.child(Flow.class).toInstance();
dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
-
+
//Register for meter updates
InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
.child(Meter.class).toInstance();
dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
-
- //Register for group updates
+
+ //Register for group updates
InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
.child(Group.class).toInstance();
}
protected DataModificationTransaction startChange() {
-
+
DataProviderService dps = this.getDataService();
return dps.beginTransaction();
}
-
+
private void statsRequestSender(){
-
+
List<Node> targetNodes = getAllConnectedNodes();
-
+
if(targetNodes == null)
return;
-
+
for (Node targetNode : targetNodes){
-
+
if(targetNode.getAugmentation(FlowCapableNode.class) != null){
sendStatisticsRequestsToNode(targetNode);
}
}
}
-
+
public void sendStatisticsRequestsToNode(Node targetNode){
-
+
spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
-
+
InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-
+
NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-
+
try{
if(flowStatsService != null){
sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
spLogger.error("Exception occured while sending statistics requests : {}", e);
}
}
-
+
public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
- final GetFlowTablesStatisticsInputBuilder input =
+ final GetFlowTablesStatisticsInputBuilder input =
new GetFlowTablesStatisticsInputBuilder();
-
+
input.setNode(targetNodeRef);
- Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
+ Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
flowTableStatsService.getFlowTablesStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
-
+
input.setNode(targetNode);
-
- Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
+
+ Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
-
+
}
-
+
public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
final GetFlowStatisticsFromFlowTableInputBuilder input =
new GetFlowStatisticsFromFlowTableInputBuilder();
-
+
input.setNode(targetNode);
input.fieldsFrom(flow);
-
- Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
+
+ Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
flowStatsService.getFlowStatisticsFromFlowTable(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
-
+
}
public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
-
+
List<Short> tablesId = getTablesFromNode(targetNodeKey);
-
+
if(tablesId.size() != 0){
for(Short id : tablesId){
-
+
sendAggregateFlowsStatsFromTableRequest(targetNodeKey,id);
}
}else{
spLogger.debug("No details found in data store for flow tables associated with Node {}",targetNodeKey);
}
}
-
+
public void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
-
+
spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
- GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
+ GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-
+
input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
- Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
+ Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
-
+
multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
, StatsRequestType.AGGR_FLOW);
}
public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
+ Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
portStatsService.getAllNodeConnectorsStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_PORT);
}
public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllGroupStatisticsOutput>> response =
+ Future<RpcResult<GetAllGroupStatisticsOutput>> response =
groupStatsService.getAllGroupStatistics(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_GROUP);
}
-
+
public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetGroupDescriptionOutput>> response =
+ Future<RpcResult<GetGroupDescriptionOutput>> response =
groupStatsService.getGroupDescription(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.GROUP_DESC);
}
-
+
public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllMeterStatisticsOutput>> response =
+ Future<RpcResult<GetAllMeterStatisticsOutput>> response =
meterStatsService.getAllMeterStatistics(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_METER);;
}
-
+
public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
+ Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
meterStatsService.getAllMeterConfigStatistics(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.METER_CONFIG);;
}
-
+
public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
-
+
input.setNode(targetNode);
-
- Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
+
+ Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
-
+
input.setNode(targetNode);
input.setNodeConnectorId(nodeConnectorId);
input.setQueueId(queueId);
- Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
+ Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
queueStatsService.getQueueStatisticsFromGivenPort(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
- public ConcurrentMap<NodeId, NodeStatisticsAger> getStatisticsCache() {
- return statisticsCache;
+ public final NodeStatisticsAger getStatisticsAger(final NodeId nodeId) {
+ NodeStatisticsAger ager = statisticsCache.get(nodeId);
+ if (ager == null) {
+ ager = new NodeStatisticsAger(this, new NodeKey(nodeId));
+ statisticsCache.put(nodeId, ager);
+ }
+
+ return ager;
}
-
+
private List<Node> getAllConnectedNodes(){
-
Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
if(nodes == null)
return null;
-
+
spLogger.debug("Number of connected nodes : {}",nodes.getNode().size());
return nodes.getNode();
}
-
+
private List<Short> getTablesFromNode(NodeKey nodeKey){
InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
-
+
FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
List<Short> tablesId = new ArrayList<Short>();
if(node != null && node.getTable()!=null){
NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
return nodeKey.getId();
}
-
+
@SuppressWarnings("deprecation")
@Override
public void close(){
-
+
try {
spLogger.info("Statistics Provider stopped.");
if (this.listenerRegistration != null) {
-
+
this.listenerRegistration.close();
-
+
this.statisticsRequesterThread.destroy();
-
+
this.statisticsAgerThread.destroy();
-
+
}
} catch (Throwable e) {
throw Exceptions.sneakyThrow(e);
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.FlowEntry;
import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.QueueEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
}
@Override
- public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) {
+ public void onMeterConfigStatsUpdated(final MeterConfigStatsUpdated notification) {
//Check if response is for the request statistics-manager sent.
if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
- NodeKey key = new NodeKey(notification.getId());
+ final NodeKey key = new NodeKey(notification.getId());
//Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
- }
- cache.get(notification.getId()).updateMeterConfigStats(notification.getMeterConfigStats());
+ this.statisticsManager.getStatisticsAger(notification.getId()).updateMeterConfigStats(notification.getMeterConfigStats());
//Publish data to configuration data store
List<MeterConfigStats> meterConfigStatsList = notification.getMeterConfigStats();
if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
- NodeKey key = new NodeKey(notification.getId());
+ final NodeKey key = new NodeKey(notification.getId());
//Publish data to configuration data store
List<MeterStats> meterStatsList = notification.getMeterStats();
if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
- NodeKey key = new NodeKey(notification.getId());
+ final NodeKey key = new NodeKey(notification.getId());
//Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
- }
- cache.get(notification.getId()).updateGroupDescStats(notification.getGroupDescStats());
+ this.statisticsManager.getStatisticsAger(key.getId()).updateGroupDescStats(notification.getGroupDescStats());
//Publish data to configuration data store
List<GroupDescStats> groupDescStatsList = notification.getGroupDescStats();
}
@Override
- public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
+ public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
//Check if response is for the request statistics-manager sent.
if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
- NodeKey key = new NodeKey(notification.getId());
sucLogger.debug("Received flow stats update : {}",notification.toString());
+
+ final NodeKey key = new NodeKey(notification.getId());
+ final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsAger(key.getId());
DataModificationTransaction it = this.statisticsManager.startChange();
for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){
GenericStatistics flowStats = stats.build();
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
- }
- NodeStatisticsAger nsa = cache.get(notification.getId());
-
//Augment the data to the flow node
FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
// Update entry with timestamp of latest response
flow.setKey(existingFlow.getKey());
FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
- cache.get(notification.getId()).updateFlowStats(flowStatsEntry);
+ nsa.updateFlowStats(flowStatsEntry);
it.putOperationalData(flowRef, flowBuilder.build());
}
// Update entry with timestamp of latest response
flow.setKey(existingFlow.getKey());
FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
- cache.get(notification.getId()).updateFlowStats(flowStatsEntry);
+ nsa.updateFlowStats(flowStatsEntry);
it.putOperationalData(flowRef, flowBuilder.build());
break;
// Update entry with timestamp of latest response
flow.setKey(newFlowKey);
FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
- cache.get(notification.getId()).updateFlowStats(flowStatsEntry);
+ nsa.updateFlowStats(flowStatsEntry);
it.putOperationalData(flowRef, flowBuilder.build());
}
NodeKey key = new NodeKey(notification.getId());
//Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
- }
-
- NodeStatisticsAger nsa = cache.get(notification.getId());
+ NodeStatisticsAger nsa = this.statisticsManager.getStatisticsAger(key.getId());
List<QueueIdAndStatisticsMap> queuesStats = notification.getQueueIdAndStatisticsMap();
DataModificationTransaction it = this.statisticsManager.startChange();