From 86f2711e18784bfd321e6409370bcc11512f4d43 Mon Sep 17 00:00:00 2001 From: Anil Vishnoi Date: Thu, 23 Jan 2014 14:43:43 +0530 Subject: [PATCH] Gerrit contains following minor enhancements: 1) Send statistics requests whenever new flow capable node connects to the controller 2) Clean up transaction-id cache for expired Ids 3) Remove Tx id when last part of multipart response received. Change-Id: I4055b7e7ad10a67e78bafd3b977db642fe5b1ee3 Signed-off-by: Anil Vishnoi --- .../manager/MultipartMessageManager.java | 122 ++++++++++++++++-- .../manager/StatisticsProvider.java | 100 ++++++++------ .../manager/StatisticsUpdateCommiter.java | 37 +++--- .../manager/StatisticsUpdateHandler.java | 13 ++ 4 files changed, 201 insertions(+), 71 deletions(-) diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java index 998d5d8faa..dfe356bdb1 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java @@ -7,10 +7,13 @@ */ package org.opendaylight.controller.md.statistics.manager; +import java.util.Date; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; /** * Main responsibility of the class is to manage multipart response @@ -27,32 +30,119 @@ public class MultipartMessageManager { * response for which it didn't send the request. */ - private static Map txIdToRequestTypeMap = new ConcurrentHashMap(); + private static Map txIdToRequestTypeMap = new ConcurrentHashMap(); /* * Map to keep track of the request tx id for flow table statistics request. * Because flow table statistics multi part response do not contains the table id. */ - private static Map txIdTotableIdMap = new ConcurrentHashMap(); + private static Map txIdTotableIdMap = new ConcurrentHashMap(); + private final int NUMBER_OF_WAIT_CYCLES =2; + + class TxIdEntry{ + private final TransactionId txId; + private final NodeId nodeId; + private final StatsRequestType requestType; + + public TxIdEntry(NodeId nodeId, TransactionId txId, StatsRequestType requestType){ + this.txId = txId; + this.nodeId = nodeId; + this.requestType = requestType; + } + public TransactionId getTxId() { + return txId; + } + public NodeId getNodeId() { + return nodeId; + } + public StatsRequestType getRequestType() { + return requestType; + } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getOuterType().hashCode(); + result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); + result = prime * result + ((txId == null) ? 0 : txId.hashCode()); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof TxIdEntry)) { + return false; + } + TxIdEntry other = (TxIdEntry) obj; + if (!getOuterType().equals(other.getOuterType())) { + return false; + } + if (nodeId == null) { + if (other.nodeId != null) { + return false; + } + } else if (!nodeId.equals(other.nodeId)) { + return false; + } + if (txId == null) { + if (other.txId != null) { + return false; + } + } else if (!txId.equals(other.txId)) { + return false; + } + return true; + } + private MultipartMessageManager getOuterType() { + return MultipartMessageManager.this; + } + @Override + public String toString() { + return "TxIdEntry [txId=" + txId + ", nodeId=" + nodeId + ", requestType=" + requestType + "]"; + } + } + public MultipartMessageManager(){} - public Short getTableIdForTxId(TransactionId id){ + public Short getTableIdForTxId(NodeId nodeId,TransactionId id){ - return txIdTotableIdMap.get(id); + return txIdTotableIdMap.get(new TxIdEntry(nodeId,id,null)); } - public void setTxIdAndTableIdMapEntry(TransactionId id,Short tableId){ - txIdTotableIdMap.put(id, tableId); + public void setTxIdAndTableIdMapEntry(NodeId nodeId, TransactionId id,Short tableId){ + + txIdTotableIdMap.put(new TxIdEntry(nodeId,id,null), tableId); } - public void addTxIdToRequestTypeEntry (TransactionId id,StatsRequestType type){ - txIdToRequestTypeMap.put(id, type); + public boolean isRequestTxIdExist(NodeId nodeId, TransactionId id, Boolean moreRepliesToFollow){ + TxIdEntry entry = new TxIdEntry(nodeId,id,null); + if(moreRepliesToFollow.booleanValue()){ + return txIdToRequestTypeMap.containsKey(entry); + }else{ + return txIdToRequestTypeMap.remove(entry)==null?false:true; + } } - public StatsRequestType removeTxId(TransactionId id){ - return txIdToRequestTypeMap.remove(id); + public void addTxIdToRequestTypeEntry (NodeId nodeId, TransactionId id,StatsRequestType type){ + TxIdEntry entry = new TxIdEntry(nodeId,id,type); + txIdToRequestTypeMap.put(entry, getExpiryTime()); + } + public boolean removeTxId(NodeId nodeId, TransactionId id){ + TxIdEntry entry = new TxIdEntry(nodeId,id,null); + return txIdToRequestTypeMap.remove(entry)==null?false:true; } + private Date getExpiryTime(){ + Date expires = new Date(); + expires.setTime(expires.getTime()+StatisticsProvider.STATS_THREAD_EXECUTION_TIME*NUMBER_OF_WAIT_CYCLES); + return expires; + } + public enum StatsRequestType{ ALL_FLOW, AGGR_FLOW, @@ -64,4 +154,16 @@ public class MultipartMessageManager { GROUP_DESC, METER_CONFIG } + + public void cleanStaleTransactionIds(){ + for (Iterator it = txIdToRequestTypeMap.keySet().iterator();it.hasNext();){ + TxIdEntry txIdEntry = it.next(); + Date now = new Date(); + Date expiryTime = txIdToRequestTypeMap.get(txIdEntry); + if(now.after(expiryTime)){ + it.remove(); + txIdTotableIdMap.remove(txIdEntry); + } + } + } } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java index 7b7403f1c3..491ff4bf44 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java @@ -115,7 +115,7 @@ public class StatisticsProvider implements AutoCloseable { private final InstanceIdentifier nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance(); - public static final int STATS_THREAD_EXECUTION_TIME= 30000; + public static final int STATS_THREAD_EXECUTION_TIME= 15000; //Local caching of stats private final ConcurrentMap statisticsCache = @@ -211,6 +211,7 @@ public class StatisticsProvider implements AutoCloseable { for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){ nodeStatisticsAger.cleanStaleStatistics(); } + multipartMessageManager.cleanStaleTransactionIds(); Thread.sleep(STATS_THREAD_EXECUTION_TIME); }catch (Exception e){ @@ -228,6 +229,11 @@ public class StatisticsProvider implements AutoCloseable { } private void registerDataStoreUpdateListener(DataBrokerService dbs) { + //Register for Node updates + InstanceIdentifier pathNode = InstanceIdentifier.builder(Nodes.class) + .child(Node.class).toInstance(); + dbs.registerDataChangeListener(pathNode, statsUpdateHandler); + //Register for flow updates InstanceIdentifier pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class) .augmentation(FlowCapableNode.class) @@ -273,37 +279,42 @@ public class StatisticsProvider implements AutoCloseable { for (Node targetNode : targetNodes){ if(targetNode.getAugmentation(FlowCapableNode.class) != null){ + sendStatisticsRequestsToNode(targetNode); + } + } + } + + public void sendStatisticsRequestsToNode(Node targetNode){ + + spLogger.debug("Send requests for statistics collection to node : {})",targetNode.getId()); + + InstanceIdentifier targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance(); + + NodeRef targetNodeRef = new NodeRef(targetInstanceId); + + try{ + sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey()); + + sendAllFlowsStatsFromAllTablesRequest(targetNodeRef); - spLogger.info("Send request for stats collection to node : {})",targetNode.getId()); - - InstanceIdentifier targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance(); - - NodeRef targetNodeRef = new NodeRef(targetInstanceId); - - try{ - sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey()); - - sendAllFlowsStatsFromAllTablesRequest(targetNodeRef); + sendAllNodeConnectorsStatisticsRequest(targetNodeRef); + + sendAllFlowTablesStatisticsRequest(targetNodeRef); + + sendAllQueueStatsFromAllNodeConnector (targetNodeRef); - sendAllNodeConnectorsStatisticsRequest(targetNodeRef); - - sendAllFlowTablesStatisticsRequest(targetNodeRef); - - sendAllQueueStatsFromAllNodeConnector (targetNodeRef); - - sendAllGroupStatisticsRequest(targetNodeRef); - - sendAllMeterStatisticsRequest(targetNodeRef); - - sendGroupDescriptionRequest(targetNodeRef); - - sendMeterConfigStatisticsRequest(targetNodeRef); - }catch(Exception e){ - spLogger.error("Exception occured while sending statistics requests : {}", e); - } - } + sendAllGroupStatisticsRequest(targetNodeRef); + + sendAllMeterStatisticsRequest(targetNodeRef); + + sendGroupDescriptionRequest(targetNodeRef); + + sendMeterConfigStatisticsRequest(targetNodeRef); + }catch(Exception e){ + spLogger.error("Exception occured while sending statistics requests : {}", e); } } + public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException { final GetFlowTablesStatisticsInputBuilder input = @@ -314,7 +325,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = flowTableStatsService.getFlowTablesStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId() , StatsRequestType.ALL_FLOW_TABLE); } @@ -328,7 +339,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() , StatsRequestType.ALL_FLOW); } @@ -343,7 +354,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = flowStatsService.getFlowStatisticsFromFlowTable(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() , StatsRequestType.ALL_FLOW); } @@ -355,7 +366,7 @@ public class StatisticsProvider implements AutoCloseable { if(tablesId.size() != 0){ for(Short id : tablesId){ - spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey); + spLogger.debug("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey); GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); @@ -364,8 +375,8 @@ public class StatisticsProvider implements AutoCloseable { Future> response = flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build()); - multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), id); + this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId() , StatsRequestType.AGGR_FLOW); } }else{ @@ -381,7 +392,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = portStatsService.getAllNodeConnectorsStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() , StatsRequestType.ALL_PORT); } @@ -395,7 +406,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = groupStatsService.getAllGroupStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() , StatsRequestType.ALL_GROUP); } @@ -408,7 +419,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = groupStatsService.getGroupDescription(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() , StatsRequestType.GROUP_DESC); } @@ -422,7 +433,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = meterStatsService.getAllMeterStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() , StatsRequestType.ALL_METER);; } @@ -436,7 +447,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = meterStatsService.getAllMeterConfigStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() , StatsRequestType.METER_CONFIG);; } @@ -449,7 +460,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() , StatsRequestType.ALL_QUEUE_STATS);; } @@ -463,7 +474,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = queueStatsService.getQueueStatisticsFromGivenPort(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() , StatsRequestType.ALL_QUEUE_STATS);; } @@ -496,6 +507,13 @@ public class StatisticsProvider implements AutoCloseable { return tablesId; } + @SuppressWarnings("unchecked") + private NodeId getNodeId(NodeRef nodeRef){ + InstanceIdentifier nodeII = (InstanceIdentifier) nodeRef.getValue(); + NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII); + return nodeKey.getId(); + } + @SuppressWarnings("deprecation") @Override public void close(){ diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java index ace547a03c..07dcd0f6aa 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java @@ -124,12 +124,14 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList public final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class); private final StatisticsProvider statisticsManager; + private final MultipartMessageManager messageManager; private int unaccountedFlowsCounter = 1; public StatisticsUpdateCommiter(final StatisticsProvider manager){ this.statisticsManager = manager; + this.messageManager = this.statisticsManager.getMultipartMessageManager(); } public StatisticsProvider getStatisticsManager(){ @@ -139,7 +141,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList @Override public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) { //Check if response is for the request statistics-manager sent. - if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) + if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) return; NodeKey key = new NodeKey(notification.getId()); @@ -181,7 +183,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) { //Check if response is for the request statistics-manager sent. - if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) + if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) return; NodeKey key = new NodeKey(notification.getId()); @@ -216,7 +218,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) { //Check if response is for the request statistics-manager sent. - if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) + if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) return; NodeKey key = new NodeKey(notification.getId()); @@ -259,7 +261,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) { //Check if response is for the request statistics-manager sent. - if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) + if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) return; //Publish data to configuration data store @@ -350,7 +352,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) { //Check if response is for the request statistics-manager sent. - if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) + if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) return; NodeKey key = new NodeKey(notification.getId()); @@ -454,7 +456,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList .child(Flow.class,existingFlow.getKey()).toInstance(); flowBuilder.setKey(existingFlow.getKey()); flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); - sucLogger.info("Found matching flow in the datastore, augmenting statistics"); + sucLogger.debug("Found matching flow in the datastore, augmenting statistics"); foundOriginalFlow = true; it.putOperationalData(flowRef, flowBuilder.build()); it.commit(); @@ -479,7 +481,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList .child(Flow.class,existingFlow.getKey()).toInstance(); flowBuilder.setKey(existingFlow.getKey()); flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); - sucLogger.debug("Found matching flow in the operational datastore, augmenting statistics"); + sucLogger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics"); foundOriginalFlow = true; it.putOperationalData(flowRef, flowBuilder.build()); it.commit(); @@ -489,7 +491,6 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList } } if(!foundOriginalFlow){ - sucLogger.debug("Associated original flow is not found in data store. Augmenting flow in operational data store"); long flowKey = Long.parseLong(new String("1"+Short.toString(tableId)+"0"+Integer.toString(this.unaccountedFlowsCounter))); this.unaccountedFlowsCounter++; FlowKey newFlowKey = new FlowKey(new FlowId(Long.toString(flowKey))); @@ -499,7 +500,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList .child(Flow.class,newFlowKey).toInstance(); flowBuilder.setKey(newFlowKey); flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); - sucLogger.info("Flow was no present in data store, augmenting statistics as an unaccounted flow"); + sucLogger.info("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",flowBuilder.build()); it.putOperationalData(flowRef, flowBuilder.build()); it.commit(); } @@ -509,13 +510,12 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList @Override public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) { //Check if response is for the request statistics-manager sent. - if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) + if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) return; NodeKey key = new NodeKey(notification.getId()); - sucLogger.debug("Received aggregate flow statistics update : {}",notification.toString()); - Short tableId = this.statisticsManager.getMultipartMessageManager().getTableIdForTxId(notification.getTransactionId()); + Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId()); if(tableId != null){ DataModificationTransaction it = this.statisticsManager.startChange(); @@ -544,11 +544,10 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList @Override public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) { //Check if response is for the request statistics-manager sent. - if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) + if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) return; NodeKey key = new NodeKey(notification.getId()); - sucLogger.debug("Received port stats update : {}",notification.toString()); List portsStats = notification.getNodeConnectorStatisticsAndPortNumberMap(); for(NodeConnectorStatisticsAndPortNumberMap portStats : portsStats){ @@ -592,11 +591,10 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList @Override public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) { //Check if response is for the request statistics-manager sent. - if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) + if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) return; NodeKey key = new NodeKey(notification.getId()); - sucLogger.debug("Received flow table statistics update : {}",notification.toString()); List flowTablesStatsList = notification.getFlowTableAndStatisticsMap(); for (FlowTableAndStatisticsMap ftStats : flowTablesStatsList){ @@ -629,11 +627,10 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) { //Check if response is for the request statistics-manager sent. - if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) + if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) return; NodeKey key = new NodeKey(notification.getId()); - sucLogger.debug("Received queue stats update : {}",notification.toString()); //Add statistics to local cache ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); @@ -670,7 +667,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, queueStatisticsDataBuilder.build()); queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId())); - sucLogger.info("Augmenting queue statistics {} of queue {} to port {}" + sucLogger.debug("Augmenting queue statistics {} of queue {} to port {}" ,queueStatisticsDataBuilder.build().toString(), swQueueStats.getQueueId(), swQueueStats.getNodeConnectorId()); @@ -686,7 +683,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList InstanceIdentifierBuilder builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey); return new NodeRef(builder.toInstance()); } - + public boolean flowEquals(Flow statsFlow, Flow storedFlow) { if (statsFlow.getClass() != storedFlow.getClass()) { return false; diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java index f04c29fdd2..941a8f8c2c 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java @@ -14,6 +14,7 @@ import java.util.concurrent.ExecutionException; import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; import org.opendaylight.controller.sal.binding.api.data.DataChangeListener; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData; @@ -63,6 +64,18 @@ public class StatisticsUpdateHandler implements DataChangeListener { @Override public void onDataChanged(DataChangeEvent, DataObject> change) { + Map, DataObject> nodeAdditions = change.getCreatedOperationalData(); + for (InstanceIdentifier dataObjectInstance : nodeAdditions.keySet()) { + DataObject dataObject = nodeAdditions.get(dataObjectInstance); + if(dataObject instanceof Node){ + + Node node = (Node) dataObject; + if(node.getAugmentation(FlowCapableNode.class) != null){ + this.statisticsManager.sendStatisticsRequestsToNode(node); + } + } + } + Map, DataObject> additions = change.getCreatedConfigurationData(); for (InstanceIdentifier dataObjectInstance : additions.keySet()) { DataObject dataObject = additions.get(dataObjectInstance); -- 2.36.6