1) Send statistics requests whenever new flow capable node connects to the controller
2) Clean up transaction-id cache for expired Ids
3) Remove Tx id when last part of multipart response received.
Change-Id: I4055b7e7ad10a67e78bafd3b977db642fe5b1ee3
Signed-off-by: Anil Vishnoi <avishnoi@in.ibm.com>
*/
package org.opendaylight.controller.md.statistics.manager;
+import java.util.Date;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
/**
* Main responsibility of the class is to manage multipart response
* response for which it didn't send the request.
*/
- private static Map<TransactionId,StatsRequestType> txIdToRequestTypeMap = new ConcurrentHashMap<TransactionId,StatsRequestType>();
+ private static Map<TxIdEntry,Date> txIdToRequestTypeMap = new ConcurrentHashMap<TxIdEntry,Date>();
/*
* Map to keep track of the request tx id for flow table statistics request.
* Because flow table statistics multi part response do not contains the table id.
*/
- private static Map<TransactionId,Short> txIdTotableIdMap = new ConcurrentHashMap<TransactionId,Short>();
+ private static Map<TxIdEntry,Short> txIdTotableIdMap = new ConcurrentHashMap<TxIdEntry,Short>();
+ private final int NUMBER_OF_WAIT_CYCLES =2;
+
+ class TxIdEntry{
+ private final TransactionId txId;
+ private final NodeId nodeId;
+ private final StatsRequestType requestType;
+
+ public TxIdEntry(NodeId nodeId, TransactionId txId, StatsRequestType requestType){
+ this.txId = txId;
+ this.nodeId = nodeId;
+ this.requestType = requestType;
+ }
+ public TransactionId getTxId() {
+ return txId;
+ }
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+ public StatsRequestType getRequestType() {
+ return requestType;
+ }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+ result = prime * result + ((txId == null) ? 0 : txId.hashCode());
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof TxIdEntry)) {
+ return false;
+ }
+ TxIdEntry other = (TxIdEntry) obj;
+ if (!getOuterType().equals(other.getOuterType())) {
+ return false;
+ }
+ if (nodeId == null) {
+ if (other.nodeId != null) {
+ return false;
+ }
+ } else if (!nodeId.equals(other.nodeId)) {
+ return false;
+ }
+ if (txId == null) {
+ if (other.txId != null) {
+ return false;
+ }
+ } else if (!txId.equals(other.txId)) {
+ return false;
+ }
+ return true;
+ }
+ private MultipartMessageManager getOuterType() {
+ return MultipartMessageManager.this;
+ }
+ @Override
+ public String toString() {
+ return "TxIdEntry [txId=" + txId + ", nodeId=" + nodeId + ", requestType=" + requestType + "]";
+ }
+ }
+
public MultipartMessageManager(){}
- public Short getTableIdForTxId(TransactionId id){
+ public Short getTableIdForTxId(NodeId nodeId,TransactionId id){
- return txIdTotableIdMap.get(id);
+ return txIdTotableIdMap.get(new TxIdEntry(nodeId,id,null));
}
- public void setTxIdAndTableIdMapEntry(TransactionId id,Short tableId){
- txIdTotableIdMap.put(id, tableId);
+ public void setTxIdAndTableIdMapEntry(NodeId nodeId, TransactionId id,Short tableId){
+
+ txIdTotableIdMap.put(new TxIdEntry(nodeId,id,null), tableId);
}
- public void addTxIdToRequestTypeEntry (TransactionId id,StatsRequestType type){
- txIdToRequestTypeMap.put(id, type);
+ public boolean isRequestTxIdExist(NodeId nodeId, TransactionId id, Boolean moreRepliesToFollow){
+ TxIdEntry entry = new TxIdEntry(nodeId,id,null);
+ if(moreRepliesToFollow.booleanValue()){
+ return txIdToRequestTypeMap.containsKey(entry);
+ }else{
+ return txIdToRequestTypeMap.remove(entry)==null?false:true;
+ }
}
- public StatsRequestType removeTxId(TransactionId id){
- return txIdToRequestTypeMap.remove(id);
+ public void addTxIdToRequestTypeEntry (NodeId nodeId, TransactionId id,StatsRequestType type){
+ TxIdEntry entry = new TxIdEntry(nodeId,id,type);
+ txIdToRequestTypeMap.put(entry, getExpiryTime());
+ }
+ public boolean removeTxId(NodeId nodeId, TransactionId id){
+ TxIdEntry entry = new TxIdEntry(nodeId,id,null);
+ return txIdToRequestTypeMap.remove(entry)==null?false:true;
}
+ private Date getExpiryTime(){
+ Date expires = new Date();
+ expires.setTime(expires.getTime()+StatisticsProvider.STATS_THREAD_EXECUTION_TIME*NUMBER_OF_WAIT_CYCLES);
+ return expires;
+ }
+
public enum StatsRequestType{
ALL_FLOW,
AGGR_FLOW,
GROUP_DESC,
METER_CONFIG
}
+
+ public void cleanStaleTransactionIds(){
+ for (Iterator<TxIdEntry> it = txIdToRequestTypeMap.keySet().iterator();it.hasNext();){
+ TxIdEntry txIdEntry = it.next();
+ Date now = new Date();
+ Date expiryTime = txIdToRequestTypeMap.get(txIdEntry);
+ if(now.after(expiryTime)){
+ it.remove();
+ txIdTotableIdMap.remove(txIdEntry);
+ }
+ }
+ }
}
private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
- public static final int STATS_THREAD_EXECUTION_TIME= 30000;
+ public static final int STATS_THREAD_EXECUTION_TIME= 15000;
//Local caching of stats
private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache =
for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
nodeStatisticsAger.cleanStaleStatistics();
}
+ multipartMessageManager.cleanStaleTransactionIds();
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
}
private void registerDataStoreUpdateListener(DataBrokerService dbs) {
+ //Register for Node updates
+ InstanceIdentifier<? extends DataObject> pathNode = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class).toInstance();
+ dbs.registerDataChangeListener(pathNode, statsUpdateHandler);
+
//Register for flow updates
InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
.augmentation(FlowCapableNode.class)
for (Node targetNode : targetNodes){
if(targetNode.getAugmentation(FlowCapableNode.class) != null){
+ sendStatisticsRequestsToNode(targetNode);
+ }
+ }
+ }
+
+ public void sendStatisticsRequestsToNode(Node targetNode){
+
+ spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId());
+
+ InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
+
+ NodeRef targetNodeRef = new NodeRef(targetInstanceId);
+
+ try{
+ sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
+
+ sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
- spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
-
- InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-
- NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-
- try{
- sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
-
- sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
+ sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
+
+ sendAllFlowTablesStatisticsRequest(targetNodeRef);
+
+ sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
- sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
-
- sendAllFlowTablesStatisticsRequest(targetNodeRef);
-
- sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
-
- sendAllGroupStatisticsRequest(targetNodeRef);
-
- sendAllMeterStatisticsRequest(targetNodeRef);
-
- sendGroupDescriptionRequest(targetNodeRef);
-
- sendMeterConfigStatisticsRequest(targetNodeRef);
- }catch(Exception e){
- spLogger.error("Exception occured while sending statistics requests : {}", e);
- }
- }
+ sendAllGroupStatisticsRequest(targetNodeRef);
+
+ sendAllMeterStatisticsRequest(targetNodeRef);
+
+ sendGroupDescriptionRequest(targetNodeRef);
+
+ sendMeterConfigStatisticsRequest(targetNodeRef);
+ }catch(Exception e){
+ spLogger.error("Exception occured while sending statistics requests : {}", e);
}
}
+
public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
final GetFlowTablesStatisticsInputBuilder input =
Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
flowTableStatsService.getFlowTablesStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW_TABLE);
}
Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
}
Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
flowStatsService.getFlowStatisticsFromFlowTable(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
}
if(tablesId.size() != 0){
for(Short id : tablesId){
- spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
+ spLogger.debug("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
- multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), id);
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
, StatsRequestType.AGGR_FLOW);
}
}else{
Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
portStatsService.getAllNodeConnectorsStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_PORT);
}
Future<RpcResult<GetAllGroupStatisticsOutput>> response =
groupStatsService.getAllGroupStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_GROUP);
}
Future<RpcResult<GetGroupDescriptionOutput>> response =
groupStatsService.getGroupDescription(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.GROUP_DESC);
}
Future<RpcResult<GetAllMeterStatisticsOutput>> response =
meterStatsService.getAllMeterStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_METER);;
}
Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
meterStatsService.getAllMeterConfigStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.METER_CONFIG);;
}
Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
queueStatsService.getQueueStatisticsFromGivenPort(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
return tablesId;
}
+ @SuppressWarnings("unchecked")
+ private NodeId getNodeId(NodeRef nodeRef){
+ InstanceIdentifier<Node> nodeII = (InstanceIdentifier<Node>) nodeRef.getValue();
+ NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII);
+ return nodeKey.getId();
+ }
+
@SuppressWarnings("deprecation")
@Override
public void close(){
public final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
private final StatisticsProvider statisticsManager;
+ private final MultipartMessageManager messageManager;
private int unaccountedFlowsCounter = 1;
public StatisticsUpdateCommiter(final StatisticsProvider manager){
this.statisticsManager = manager;
+ this.messageManager = this.statisticsManager.getMultipartMessageManager();
}
public StatisticsProvider getStatisticsManager(){
@Override
public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) {
//Check if response is for the request statistics-manager sent.
- if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
NodeKey key = new NodeKey(notification.getId());
public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
//Check if response is for the request statistics-manager sent.
- if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
NodeKey key = new NodeKey(notification.getId());
public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
//Check if response is for the request statistics-manager sent.
- if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
NodeKey key = new NodeKey(notification.getId());
public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
//Check if response is for the request statistics-manager sent.
- if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
//Publish data to configuration data store
public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
//Check if response is for the request statistics-manager sent.
- if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
NodeKey key = new NodeKey(notification.getId());
.child(Flow.class,existingFlow.getKey()).toInstance();
flowBuilder.setKey(existingFlow.getKey());
flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- sucLogger.info("Found matching flow in the datastore, augmenting statistics");
+ sucLogger.debug("Found matching flow in the datastore, augmenting statistics");
foundOriginalFlow = true;
it.putOperationalData(flowRef, flowBuilder.build());
it.commit();
.child(Flow.class,existingFlow.getKey()).toInstance();
flowBuilder.setKey(existingFlow.getKey());
flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- sucLogger.debug("Found matching flow in the operational datastore, augmenting statistics");
+ sucLogger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
foundOriginalFlow = true;
it.putOperationalData(flowRef, flowBuilder.build());
it.commit();
}
}
if(!foundOriginalFlow){
- sucLogger.debug("Associated original flow is not found in data store. Augmenting flow in operational data store");
long flowKey = Long.parseLong(new String("1"+Short.toString(tableId)+"0"+Integer.toString(this.unaccountedFlowsCounter)));
this.unaccountedFlowsCounter++;
FlowKey newFlowKey = new FlowKey(new FlowId(Long.toString(flowKey)));
.child(Flow.class,newFlowKey).toInstance();
flowBuilder.setKey(newFlowKey);
flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- sucLogger.info("Flow was no present in data store, augmenting statistics as an unaccounted flow");
+ sucLogger.info("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",flowBuilder.build());
it.putOperationalData(flowRef, flowBuilder.build());
it.commit();
}
@Override
public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
//Check if response is for the request statistics-manager sent.
- if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
NodeKey key = new NodeKey(notification.getId());
- sucLogger.debug("Received aggregate flow statistics update : {}",notification.toString());
- Short tableId = this.statisticsManager.getMultipartMessageManager().getTableIdForTxId(notification.getTransactionId());
+ Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId());
if(tableId != null){
DataModificationTransaction it = this.statisticsManager.startChange();
@Override
public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
//Check if response is for the request statistics-manager sent.
- if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
NodeKey key = new NodeKey(notification.getId());
- sucLogger.debug("Received port stats update : {}",notification.toString());
List<NodeConnectorStatisticsAndPortNumberMap> portsStats = notification.getNodeConnectorStatisticsAndPortNumberMap();
for(NodeConnectorStatisticsAndPortNumberMap portStats : portsStats){
@Override
public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
//Check if response is for the request statistics-manager sent.
- if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
NodeKey key = new NodeKey(notification.getId());
- sucLogger.debug("Received flow table statistics update : {}",notification.toString());
List<FlowTableAndStatisticsMap> flowTablesStatsList = notification.getFlowTableAndStatisticsMap();
for (FlowTableAndStatisticsMap ftStats : flowTablesStatsList){
public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
//Check if response is for the request statistics-manager sent.
- if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
+ if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
return;
NodeKey key = new NodeKey(notification.getId());
- sucLogger.debug("Received queue stats update : {}",notification.toString());
//Add statistics to local cache
ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, queueStatisticsDataBuilder.build());
queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
- sucLogger.info("Augmenting queue statistics {} of queue {} to port {}"
+ sucLogger.debug("Augmenting queue statistics {} of queue {} to port {}"
,queueStatisticsDataBuilder.build().toString(),
swQueueStats.getQueueId(),
swQueueStats.getNodeConnectorId());
InstanceIdentifierBuilder<?> builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey);
return new NodeRef(builder.toInstance());
}
-
+
public boolean flowEquals(Flow statsFlow, Flow storedFlow) {
if (statsFlow.getClass() != storedFlow.getClass()) {
return false;
import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
@Override
public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+ Map<InstanceIdentifier<?>, DataObject> nodeAdditions = change.getCreatedOperationalData();
+ for (InstanceIdentifier<? extends DataObject> dataObjectInstance : nodeAdditions.keySet()) {
+ DataObject dataObject = nodeAdditions.get(dataObjectInstance);
+ if(dataObject instanceof Node){
+
+ Node node = (Node) dataObject;
+ if(node.getAugmentation(FlowCapableNode.class) != null){
+ this.statisticsManager.sendStatisticsRequestsToNode(node);
+ }
+ }
+ }
+
Map<InstanceIdentifier<?>, DataObject> additions = change.getCreatedConfigurationData();
for (InstanceIdentifier<? extends DataObject> dataObjectInstance : additions.keySet()) {
DataObject dataObject = additions.get(dataObjectInstance);