From cf870b9474f14659a68d3fba504962ba859a94fa Mon Sep 17 00:00:00 2001 From: Anil Vishnoi Date: Tue, 21 Jan 2014 16:04:25 +0530 Subject: [PATCH] Gerrit contains following changes : 1) Fix for bug 284. Added functionality that listen to data store *remove* changes and cleanup the relevant statistics from operational data store. 2) Added functionality that listen to config data store *create* changes for Flow,Group, Meter & Queue and send statistics request to switch to get respective stats. 3) Added functionality to periodically remove stale stats from operational data store. Clean up thread invokes after every two cycle of stats collection. 4) Removed unnecessary local caching. Change-Id: Ibee3c73905ce872302c4f54ce5b7b53c0657ee51 Signed-off-by: Anil Vishnoi --- .../md/statistics/manager/NodeStatistics.java | 140 ------- .../manager/NodeStatisticsAger.java | 346 ++++++++++++++++++ .../manager/StatisticsManagerActivator.java | 3 + .../manager/StatisticsProvider.java | 332 +++++++++++------ .../manager/StatisticsUpdateCommiter.java | 338 ++++++++--------- .../manager/StatisticsUpdateHandler.java | 156 ++++++++ 6 files changed, 888 insertions(+), 427 deletions(-) delete mode 100644 opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatistics.java create mode 100644 opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java create mode 100644 opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatistics.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatistics.java deleted file mode 100644 index e84b437b53..0000000000 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatistics.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright IBM Corporation, 2013. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.md.statistics.manager; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeatures; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericQueueStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericTableStatistics; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.NodeConnectorStatistics; - -public class NodeStatistics { - - private NodeRef targetNode; - - private List groupStatistics; - - private List meterStatistics; - - private List groupDescStats; - - private List meterConfigStats; - - private GroupFeatures groupFeatures; - - private MeterFeatures meterFeatures; - - private final Map> flowAndStatsMap= - new HashMap>(); - - private final Map tableAndAggregateFlowStatsMap = - new HashMap(); - - private final Map nodeConnectorStats = - new ConcurrentHashMap(); - - private final Map flowTableAndStatisticsMap = - new HashMap(); - - private final Map> NodeConnectorAndQueuesStatsMap = - new HashMap>(); - - public NodeStatistics(){ - - } - - public NodeRef getTargetNode() { - return targetNode; - } - - public void setTargetNode(NodeRef targetNode) { - this.targetNode = targetNode; - } - - public List getGroupStatistics() { - return groupStatistics; - } - - public void setGroupStatistics(List groupStatistics) { - this.groupStatistics = groupStatistics; - } - - public List getMeterStatistics() { - return meterStatistics; - } - - public void setMeterStatistics(List meterStatistics) { - this.meterStatistics = meterStatistics; - } - - public List getGroupDescStats() { - return groupDescStats; - } - - public void setGroupDescStats(List groupDescStats) { - this.groupDescStats = groupDescStats; - } - - public List getMeterConfigStats() { - return meterConfigStats; - } - - public void setMeterConfigStats(List meterConfigStats) { - this.meterConfigStats = meterConfigStats; - } - - public GroupFeatures getGroupFeatures() { - return groupFeatures; - } - - public void setGroupFeatures(GroupFeatures groupFeatures) { - this.groupFeatures = groupFeatures; - } - - public MeterFeatures getMeterFeatures() { - return meterFeatures; - } - - public void setMeterFeatures(MeterFeatures meterFeatures) { - this.meterFeatures = meterFeatures; - } - - public Map> getFlowAndStatsMap() { - return flowAndStatsMap; - } - - public Map getFlowTableAndStatisticsMap() { - return flowTableAndStatisticsMap; - } - - public Map getTableAndAggregateFlowStatsMap() { - return tableAndAggregateFlowStatsMap; - } - public Map getNodeConnectorStats() { - return nodeConnectorStats; - } - - public Map> getNodeConnectorAndQueuesStatsMap() { - return NodeConnectorAndQueuesStatsMap; - } -} diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java new file mode 100644 index 0000000000..4ecd620543 --- /dev/null +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java @@ -0,0 +1,346 @@ +/* + * Copyright IBM Corporation, 2013. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.statistics.manager; + +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats; +import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder; + +/** + * Main responsibility of this class to clean up all the stale statistics data + * associated to Flow,Meter,Group,Queue. + * @author avishnoi@in.ibm.com + * + */ +public class NodeStatisticsAger { + + private final int NUMBER_OF_WAIT_CYCLES =2; + + private final StatisticsProvider statisticsProvider; + + private final NodeKey targetNodeKey; + + private final Map groupDescStatsUpdate + = new ConcurrentHashMap(); + + private final Map meterConfigStatsUpdate + = new ConcurrentHashMap(); + + private final Map flowStatsUpdate + = new ConcurrentHashMap(); + + private final Map queuesStatsUpdate + = new ConcurrentHashMap(); + + public NodeStatisticsAger(StatisticsProvider statisticsProvider, NodeKey nodeKey){ + this.targetNodeKey = nodeKey; + this.statisticsProvider = statisticsProvider; + } + + public class FlowEntry{ + private final Short tableId; + private final Flow flow; + + public FlowEntry(Short tableId, Flow flow){ + this.tableId = tableId; + this.flow = flow; + } + + public Short getTableId() { + return tableId; + } + + public Flow getFlow() { + return flow; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getOuterType().hashCode(); + result = prime * result + ((flow == null) ? 0 : flow.hashCode()); + result = prime * result + ((tableId == null) ? 0 : tableId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + FlowEntry other = (FlowEntry) obj; + if (!getOuterType().equals(other.getOuterType())) + return false; + if (flow == null) { + if (other.flow != null) + return false; + } else if (!flow.equals(other.flow)) + return false; + if (tableId == null) { + if (other.tableId != null) + return false; + } else if (!tableId.equals(other.tableId)) + return false; + return true; + } + + private NodeStatisticsAger getOuterType() { + return NodeStatisticsAger.this; + } + + } + + public class QueueEntry{ + private final NodeConnectorId nodeConnectorId; + private final QueueId queueId; + public QueueEntry(NodeConnectorId ncId, QueueId queueId){ + this.nodeConnectorId = ncId; + this.queueId = queueId; + } + public NodeConnectorId getNodeConnectorId() { + return nodeConnectorId; + } + public QueueId getQueueId() { + return queueId; + } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getOuterType().hashCode(); + result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode()); + result = prime * result + ((queueId == null) ? 0 : queueId.hashCode()); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof QueueEntry)) { + return false; + } + QueueEntry other = (QueueEntry) obj; + if (!getOuterType().equals(other.getOuterType())) { + return false; + } + if (nodeConnectorId == null) { + if (other.nodeConnectorId != null) { + return false; + } + } else if (!nodeConnectorId.equals(other.nodeConnectorId)) { + return false; + } + if (queueId == null) { + if (other.queueId != null) { + return false; + } + } else if (!queueId.equals(other.queueId)) { + return false; + } + return true; + } + private NodeStatisticsAger getOuterType() { + return NodeStatisticsAger.this; + } + } + + public NodeKey getTargetNodeKey() { + return targetNodeKey; + } + + public Map getGroupDescStatsUpdate() { + return groupDescStatsUpdate; + } + + public Map getMeterConfigStatsUpdate() { + return meterConfigStatsUpdate; + } + + public Map getFlowStatsUpdate() { + return flowStatsUpdate; + } + + public Map getQueuesStatsUpdate() { + return queuesStatsUpdate; + } + + public void updateGroupDescStats(List list){ + Date expiryTime = getExpiryTime(); + for(GroupDescStats groupDescStats : list) + this.groupDescStatsUpdate.put(groupDescStats, expiryTime); + } + + public void updateMeterConfigStats(List list){ + Date expiryTime = getExpiryTime(); + for(MeterConfigStats meterConfigStats: list) + this.meterConfigStatsUpdate.put(meterConfigStats, expiryTime); + } + + public void updateFlowStats(FlowEntry flowEntry){ + this.flowStatsUpdate.put(flowEntry, getExpiryTime()); + } + public void updateQueueStats(QueueEntry queueEntry){ + this.queuesStatsUpdate.put(queueEntry, getExpiryTime()); + } + + private Date getExpiryTime(){ + Date expires = new Date(); + expires.setTime(expires.getTime()+StatisticsProvider.STATS_THREAD_EXECUTION_TIME*NUMBER_OF_WAIT_CYCLES); + return expires; + } + + public void cleanStaleStatistics(){ + //Clean stale statistics related to group + for (Iterator it = this.groupDescStatsUpdate.keySet().iterator();it.hasNext();){ + GroupDescStats groupDescStats = it.next(); + Date now = new Date(); + Date expiryTime = this.groupDescStatsUpdate.get(groupDescStats); + if(now.after(expiryTime)){ + cleanGroupStatsFromDataStore(groupDescStats ); + it.remove(); + } + } + + //Clean stale statistics related to meter + for (Iterator it = this.meterConfigStatsUpdate.keySet().iterator();it.hasNext();){ + MeterConfigStats meterConfigStats = it.next(); + Date now = new Date(); + Date expiryTime = this.meterConfigStatsUpdate.get(meterConfigStats); + if(now.after(expiryTime)){ + cleanMeterStatsFromDataStore(meterConfigStats); + it.remove(); + } + } + + //Clean stale statistics related to flow + for (Iterator it = this.flowStatsUpdate.keySet().iterator();it.hasNext();){ + FlowEntry flowEntry = it.next(); + Date now = new Date(); + Date expiryTime = this.flowStatsUpdate.get(flowEntry); + if(now.after(expiryTime)){ + cleanFlowStatsFromDataStore(flowEntry); + it.remove(); + } + } + + //Clean stale statistics related to queue + for (Iterator it = this.queuesStatsUpdate.keySet().iterator();it.hasNext();){ + QueueEntry queueEntry = it.next(); + Date now = new Date(); + Date expiryTime = this.queuesStatsUpdate.get(queueEntry); + if(now.after(expiryTime)){ + cleanQueueStatsFromDataStore(queueEntry); + it.remove(); + } + } + + } + + private void cleanQueueStatsFromDataStore(QueueEntry queueEntry) { + InstanceIdentifier queueRef + = InstanceIdentifier.builder(Nodes.class) + .child(Node.class, this.targetNodeKey) + .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId())) + .augmentation(FlowCapableNodeConnector.class) + .child(Queue.class, new QueueKey(queueEntry.getQueueId())) + .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance(); + cleanStaleStatisticsFromDataStore(queueRef); + } + + private void cleanFlowStatsFromDataStore(FlowEntry flowEntry) { + InstanceIdentifier flowRef + = InstanceIdentifier.builder(Nodes.class).child(Node.class, this.targetNodeKey) + .augmentation(FlowCapableNode.class) + .child(Table.class, new TableKey(flowEntry.getTableId())) + .child(Flow.class,flowEntry.getFlow().getKey()) + .augmentation(FlowStatisticsData.class).toInstance(); + + cleanStaleStatisticsFromDataStore(flowRef); + + } + + private void cleanMeterStatsFromDataStore(MeterConfigStats meterConfigStats) { + InstanceIdentifierBuilder meterRef + = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey) + .augmentation(FlowCapableNode.class) + .child(Meter.class,new MeterKey(meterConfigStats.getMeterId())); + + InstanceIdentifier nodeMeterConfigStatsAugmentation = meterRef.augmentation(NodeMeterConfigStats.class).toInstance(); + + cleanStaleStatisticsFromDataStore(nodeMeterConfigStatsAugmentation); + + InstanceIdentifier nodeMeterStatisticsAugmentation = meterRef.augmentation(NodeMeterStatistics.class).toInstance(); + + cleanStaleStatisticsFromDataStore(nodeMeterStatisticsAugmentation); + + } + + private void cleanGroupStatsFromDataStore(GroupDescStats groupDescStats) { + InstanceIdentifierBuilder groupRef + = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey) + .augmentation(FlowCapableNode.class) + .child(Group.class,new GroupKey(groupDescStats.getGroupId())); + + InstanceIdentifier nodeGroupDescStatsAugmentation = groupRef.augmentation(NodeGroupDescStats.class).toInstance(); + + cleanStaleStatisticsFromDataStore(nodeGroupDescStatsAugmentation); + + InstanceIdentifier nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance(); + + cleanStaleStatisticsFromDataStore(nodeGroupStatisticsAugmentation); + } + + private void cleanStaleStatisticsFromDataStore(InstanceIdentifier ii){ + if(ii != null){ + DataModificationTransaction it = this.statisticsProvider.startChange(); + it.removeOperationalData(ii); + it.commit(); + } + } +} diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManagerActivator.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManagerActivator.java index 738c2cb9a8..653cc8081a 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManagerActivator.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManagerActivator.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.md.statistics.manager; import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.controller.sal.binding.api.data.DataBrokerService; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.osgi.framework.BundleContext; @@ -26,6 +27,8 @@ public class StatisticsManagerActivator extends AbstractBindingAwareProvider { pSession = session; DataProviderService dps = session.getSALService(DataProviderService.class); StatisticsManagerActivator.statsProvider.setDataService(dps); + DataBrokerService dbs = session.getSALService(DataBrokerService.class); + StatisticsManagerActivator.statsProvider.setDataBrokerService(dbs); NotificationProviderService nps = session.getSALService(NotificationProviderService.class); StatisticsManagerActivator.statsProvider.setNotificationService(nps); StatisticsManagerActivator.statsProvider.start(); diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java index 4eaad42738..7b7403f1c3 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java @@ -17,26 +17,37 @@ import java.util.concurrent.Future; import org.eclipse.xtext.xbase.lib.Exceptions; import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.controller.sal.binding.api.data.DataBrokerService; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder; @@ -49,28 +60,45 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.G import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService; import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.NotificationListener; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Following are main responsibilities of the class: + * 1) Invoke statistics request thread to send periodic statistics request to all the + * flow capable switch connected to the controller. It sends statistics request for + * Group,Meter,Table,Flow,Queue,Aggregate stats. + * + * 2) Invoke statistics ager thread, to clean up all the stale statistics data from + * operational data store. + * + * @author avishnoi@in.ibm.com + * + */ public class StatisticsProvider implements AutoCloseable { public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class); - + private DataProviderService dps; + + private DataBrokerService dbs; private NotificationProviderService nps; - + private OpendaylightGroupStatisticsService groupStatsService; - + private OpendaylightMeterStatisticsService meterStatsService; - + private OpendaylightFlowStatisticsService flowStatsService; - + private OpendaylightPortStatisticsService portStatsService; private OpendaylightFlowTableStatisticsService flowTableStatsService; @@ -78,29 +106,41 @@ public class StatisticsProvider implements AutoCloseable { private OpendaylightQueueStatisticsService queueStatsService; private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager(); - + + private StatisticsUpdateHandler statsUpdateHandler; + private Thread statisticsRequesterThread; + + private Thread statisticsAgerThread; private final InstanceIdentifier nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance(); - - private final int STATS_THREAD_EXECUTION_TIME= 50000; + + public static final int STATS_THREAD_EXECUTION_TIME= 30000; //Local caching of stats - - private final ConcurrentMap statisticsCache = - new ConcurrentHashMap(); - + + private final ConcurrentMap statisticsCache = + new ConcurrentHashMap(); + public DataProviderService getDataService() { return this.dps; } - + public void setDataService(final DataProviderService dataService) { this.dps = dataService; } + + public DataBrokerService getDataBrokerService() { + return this.dbs; + } + + public void setDataBrokerService(final DataBrokerService dataBrokerService) { + this.dbs = dataBrokerService; + } public NotificationProviderService getNotificationService() { return this.nps; } - + public void setNotificationService(final NotificationProviderService notificationService) { this.nps = notificationService; } @@ -110,22 +150,26 @@ public class StatisticsProvider implements AutoCloseable { } private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this); - + private Registration listenerRegistration; - + public void start() { - + NotificationProviderService nps = this.getNotificationService(); Registration registerNotificationListener = nps.registerNotificationListener(this.updateCommiter); this.listenerRegistration = registerNotificationListener; - + + statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this); + + registerDataStoreUpdateListener(this.getDataBrokerService()); + // Get Group/Meter statistics service instance groupStatsService = StatisticsManagerActivator.getProviderContext(). getRpcService(OpendaylightGroupStatisticsService.class); - + meterStatsService = StatisticsManagerActivator.getProviderContext(). getRpcService(OpendaylightMeterStatisticsService.class); - + flowStatsService = StatisticsManagerActivator.getProviderContext(). getRpcService(OpendaylightFlowStatisticsService.class); @@ -134,10 +178,10 @@ public class StatisticsProvider implements AutoCloseable { flowTableStatsService = StatisticsManagerActivator.getProviderContext(). getRpcService(OpendaylightFlowTableStatisticsService.class); - + queueStatsService = StatisticsManagerActivator.getProviderContext(). getRpcService(OpendaylightQueueStatisticsService.class); - + statisticsRequesterThread = new Thread( new Runnable(){ @Override @@ -145,7 +189,7 @@ public class StatisticsProvider implements AutoCloseable { while(true){ try { statsRequestSender(); - + Thread.sleep(STATS_THREAD_EXECUTION_TIME); }catch (Exception e){ spLogger.error("Exception occurred while sending stats request : {}",e); @@ -153,55 +197,106 @@ public class StatisticsProvider implements AutoCloseable { } } }); - + spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME); - + statisticsRequesterThread.start(); + + statisticsAgerThread = new Thread( new Runnable(){ + + @Override + public void run() { + while(true){ + try { + for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){ + nodeStatisticsAger.cleanStaleStatistics(); + } + + Thread.sleep(STATS_THREAD_EXECUTION_TIME); + }catch (Exception e){ + spLogger.error("Exception occurred while sending stats request : {}",e); + } + } + } + }); + + spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME); + statisticsAgerThread.start(); + spLogger.info("Statistics Provider started."); } + + private void registerDataStoreUpdateListener(DataBrokerService dbs) { + //Register for flow updates + InstanceIdentifier pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class) + .augmentation(FlowCapableNode.class) + .child(Table.class) + .child(Flow.class).toInstance(); + dbs.registerDataChangeListener(pathFlow, statsUpdateHandler); + + //Register for meter updates + InstanceIdentifier pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class) + .augmentation(FlowCapableNode.class) + .child(Meter.class).toInstance(); + + dbs.registerDataChangeListener(pathMeter, statsUpdateHandler); + + //Register for group updates + InstanceIdentifier pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class) + .augmentation(FlowCapableNode.class) + .child(Group.class).toInstance(); + dbs.registerDataChangeListener(pathGroup, statsUpdateHandler); + + //Register for queue updates + InstanceIdentifier pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class) + .child(NodeConnector.class) + .augmentation(FlowCapableNodeConnector.class) + .child(Queue.class).toInstance(); + dbs.registerDataChangeListener(pathQueue, statsUpdateHandler); + } protected DataModificationTransaction startChange() { - + DataProviderService dps = this.getDataService(); return dps.beginTransaction(); } - + private void statsRequestSender(){ - + List targetNodes = getAllConnectedNodes(); - + if(targetNodes == null) return; - + for (Node targetNode : targetNodes){ - + if(targetNode.getAugmentation(FlowCapableNode.class) != null){ - spLogger.trace("Send request for stats collection to node : {})",targetNode.getId()); - + spLogger.info("Send request for stats collection to node : {})",targetNode.getId()); + InstanceIdentifier targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance(); - + NodeRef targetNodeRef = new NodeRef(targetInstanceId); - + try{ sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey()); - + sendAllFlowsStatsFromAllTablesRequest(targetNodeRef); sendAllNodeConnectorsStatisticsRequest(targetNodeRef); - + sendAllFlowTablesStatisticsRequest(targetNodeRef); - + sendAllQueueStatsFromAllNodeConnector (targetNodeRef); sendAllGroupStatisticsRequest(targetNodeRef); - + sendAllMeterStatisticsRequest(targetNodeRef); - + sendGroupDescriptionRequest(targetNodeRef); - + sendMeterConfigStatisticsRequest(targetNodeRef); }catch(Exception e){ spLogger.error("Exception occured while sending statistics requests : {}", e); @@ -210,13 +305,13 @@ public class StatisticsProvider implements AutoCloseable { } } - private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException { - final GetFlowTablesStatisticsInputBuilder input = + public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException { + final GetFlowTablesStatisticsInputBuilder input = new GetFlowTablesStatisticsInputBuilder(); - + input.setNode(targetNodeRef); - Future> response = + Future> response = flowTableStatsService.getFlowTablesStatistics(input.build()); this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() @@ -224,36 +319,51 @@ public class StatisticsProvider implements AutoCloseable { } - private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder(); - + input.setNode(targetNode); - - Future> response = + + Future> response = flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build()); - + this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() , StatsRequestType.ALL_FLOW); - + + } + + public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{ + final GetFlowStatisticsFromFlowTableInputBuilder input = + new GetFlowStatisticsFromFlowTableInputBuilder(); + + input.setNode(targetNode); + input.fieldsFrom(flow); + + Future> response = + flowStatsService.getFlowStatisticsFromFlowTable(input.build()); + + this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + , StatsRequestType.ALL_FLOW); + } - private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{ - + public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{ + List tablesId = getTablesFromNode(targetNodeKey); - + if(tablesId.size() != 0){ for(Short id : tablesId){ - - spLogger.trace("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey); - GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = + + spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey); + GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); - + input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance())); input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id)); - Future> response = + Future> response = flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build()); - + multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id); this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() , StatsRequestType.AGGR_FLOW); @@ -263,108 +373,122 @@ public class StatisticsProvider implements AutoCloseable { } } - private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ - + public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder(); - + input.setNode(targetNode); - Future> response = + Future> response = portStatsService.getAllNodeConnectorsStatistics(input.build()); this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() , StatsRequestType.ALL_PORT); } - private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ - + public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder(); - + input.setNode(targetNode); - Future> response = + Future> response = groupStatsService.getAllGroupStatistics(input.build()); - + this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() , StatsRequestType.ALL_GROUP); } - - private void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + + public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder(); - + input.setNode(targetNode); - Future> response = + Future> response = groupStatsService.getGroupDescription(input.build()); this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() , StatsRequestType.GROUP_DESC); } - - private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ - + + public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder(); - + input.setNode(targetNode); - Future> response = + Future> response = meterStatsService.getAllMeterStatistics(input.build()); - + this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() , StatsRequestType.ALL_METER);; } - - private void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ - + + public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder(); - + input.setNode(targetNode); - Future> response = + Future> response = meterStatsService.getAllMeterConfigStatistics(input.build()); - + this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() , StatsRequestType.METER_CONFIG);; } - - private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException { + + public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException { GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder(); - + input.setNode(targetNode); - - Future> response = + + Future> response = queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build()); + + this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() + , StatsRequestType.ALL_QUEUE_STATS);; + } + + public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException { + GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder(); + + input.setNode(targetNode); + input.setNodeConnectorId(nodeConnectorId); + input.setQueueId(queueId); + Future> response = + queueStatsService.getQueueStatisticsFromGivenPort(input.build()); + this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId() , StatsRequestType.ALL_QUEUE_STATS);; } - public ConcurrentMap getStatisticsCache() { + public ConcurrentMap getStatisticsCache() { return statisticsCache; } - + private List getAllConnectedNodes(){ - + Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier); if(nodes == null) return null; - - spLogger.trace("Number of connected nodes : {}",nodes.getNode().size()); + + spLogger.info("Number of connected nodes : {}",nodes.getNode().size()); return nodes.getNode(); } - + private List getTablesFromNode(NodeKey nodeKey){ InstanceIdentifier nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance(); - + FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier); List tablesId = new ArrayList(); if(node != null && node.getTable()!=null){ - spLogger.trace("Number of tables {} supported by node {}",node.getTable().size(),nodeKey); + spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey); for(Table table: node.getTable()){ tablesId.add(table.getId()); } @@ -375,15 +499,17 @@ public class StatisticsProvider implements AutoCloseable { @SuppressWarnings("deprecation") @Override public void close(){ - + try { - spLogger.trace("Statistics Provider stopped."); + spLogger.info("Statistics Provider stopped."); if (this.listenerRegistration != null) { - + this.listenerRegistration.close(); - + this.statisticsRequesterThread.destroy(); - + + this.statisticsAgerThread.destroy(); + } } catch (Throwable e) { throw Exceptions.sneakyThrow(e); diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java index 5743865d39..ace547a03c 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java @@ -7,10 +7,11 @@ */ package org.opendaylight.controller.md.statistics.manager; -import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentMap; +import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.FlowEntry; +import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.QueueEntry; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; @@ -44,7 +45,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev13 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated; @@ -88,7 +88,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111. import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericQueueStatistics; import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder; @@ -108,68 +107,68 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Class implement statistics manager related listener interface and augment all the + * Class implement statistics manager related listener interface and augment all the * received statistics data to data stores. - * TODO: Need to add error message listener and clean-up the associated tx id + * TODO: Need to add error message listener and clean-up the associated tx id * if it exists in the tx-id cache. * @author vishnoianil * */ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener, - OpendaylightMeterStatisticsListener, + OpendaylightMeterStatisticsListener, OpendaylightFlowStatisticsListener, OpendaylightPortStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightQueueStatisticsListener{ - + public final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class); private final StatisticsProvider statisticsManager; - - private final int unaccountedFlowsCounter = 1; + + private int unaccountedFlowsCounter = 1; public StatisticsUpdateCommiter(final StatisticsProvider manager){ this.statisticsManager = manager; } - + public StatisticsProvider getStatisticsManager(){ return statisticsManager; } - + @Override public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) { //Check if response is for the request statistics-manager sent. if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) return; + + NodeKey key = new NodeKey(notification.getId()); //Add statistics to local cache - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); + ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); + cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key)); } - cache.get(notification.getId()).setMeterConfigStats(notification.getMeterConfigStats()); - + cache.get(notification.getId()).updateMeterConfigStats(notification.getMeterConfigStats()); + //Publish data to configuration data store - NodeKey key = new NodeKey(notification.getId()); - - List eterConfigStatsList = notification.getMeterConfigStats(); - - for(MeterConfigStats meterConfigStats : eterConfigStatsList){ + List meterConfigStatsList = notification.getMeterConfigStats(); + + for(MeterConfigStats meterConfigStats : meterConfigStatsList){ DataModificationTransaction it = this.statisticsManager.startChange(); MeterBuilder meterBuilder = new MeterBuilder(); MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId()); meterBuilder.setKey(meterKey); - + InstanceIdentifier meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key) .augmentation(FlowCapableNode.class) .child(Meter.class,meterKey).toInstance(); - + NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder(); MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder(); stats.fieldsFrom(meterConfigStats); meterConfig.setMeterConfigStats(stats.build()); - + //Update augmented data meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build()); it.putOperationalData(meterRef, meterBuilder.build()); @@ -180,22 +179,15 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList @Override public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) { - + //Check if response is for the request statistics-manager sent. if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) return; - //Add statistics to local cache - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); - if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); - } - cache.get(notification.getId()).setMeterStatistics(notification.getMeterStats()); - NodeKey key = new NodeKey(notification.getId()); - + List meterStatsList = notification.getMeterStats(); - + for(MeterStats meterStats : meterStatsList){ //Publish data to configuration data store @@ -203,11 +195,11 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList MeterBuilder meterBuilder = new MeterBuilder(); MeterKey meterKey = new MeterKey(meterStats.getMeterId()); meterBuilder.setKey(meterKey); - + InstanceIdentifier meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key) .augmentation(FlowCapableNode.class) .child(Meter.class,meterKey).toInstance(); - + NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder(); MeterStatisticsBuilder stats = new MeterStatisticsBuilder(); stats.fieldsFrom(meterStats); @@ -222,29 +214,30 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList @Override public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) { - + //Check if response is for the request statistics-manager sent. if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) return; + NodeKey key = new NodeKey(notification.getId()); + //Add statistics to local cache - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); + ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); + cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key)); } - cache.get(notification.getId()).setGroupDescStats(notification.getGroupDescStats()); - + cache.get(notification.getId()).updateGroupDescStats(notification.getGroupDescStats()); + //Publish data to configuration data store - NodeKey key = new NodeKey(notification.getId()); List groupDescStatsList = notification.getGroupDescStats(); for(GroupDescStats groupDescStats : groupDescStatsList){ DataModificationTransaction it = this.statisticsManager.startChange(); - + GroupBuilder groupBuilder = new GroupBuilder(); GroupKey groupKey = new GroupKey(groupDescStats.getGroupId()); groupBuilder.setKey(groupKey); - + InstanceIdentifier groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key) .augmentation(FlowCapableNode.class) .child(Group.class,groupKey).toInstance(); @@ -253,7 +246,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList GroupDescBuilder stats = new GroupDescBuilder(); stats.fieldsFrom(groupDescStats); groupDesc.setGroupDesc(stats.build()); - + //Update augmented data groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build()); @@ -264,29 +257,22 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList @Override public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) { - + //Check if response is for the request statistics-manager sent. if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) return; - //Add statistics to local cache - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); - if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); - } - cache.get(notification.getId()).setGroupStatistics(notification.getGroupStats()); - //Publish data to configuration data store NodeKey key = new NodeKey(notification.getId()); List groupStatsList = notification.getGroupStats(); for(GroupStats groupStats : groupStatsList){ DataModificationTransaction it = this.statisticsManager.startChange(); - + GroupBuilder groupBuilder = new GroupBuilder(); GroupKey groupKey = new GroupKey(groupStats.getGroupId()); groupBuilder.setKey(groupKey); - + InstanceIdentifier groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key) .augmentation(FlowCapableNode.class) .child(Group.class,groupKey).toInstance(); @@ -295,80 +281,66 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList GroupStatisticsBuilder stats = new GroupStatisticsBuilder(); stats.fieldsFrom(groupStats); groupStatisticsBuilder.setGroupStatistics(stats.build()); - + //Update augmented data groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build()); it.putOperationalData(groupRef, groupBuilder.build()); it.commit(); } } - + @Override public void onMeterFeaturesUpdated(MeterFeaturesUpdated notification) { - //Add statistics to local cache - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); - if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); - } MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder(); meterFeature.setMeterBandSupported(notification.getMeterBandSupported()); meterFeature.setMeterCapabilitiesSupported(notification.getMeterCapabilitiesSupported()); meterFeature.setMaxBands(notification.getMaxBands()); meterFeature.setMaxColor(notification.getMaxColor()); meterFeature.setMaxMeter(notification.getMaxMeter()); - - cache.get(notification.getId()).setMeterFeatures(meterFeature.build()); - + //Publish data to configuration data store DataModificationTransaction it = this.statisticsManager.startChange(); NodeKey key = new NodeKey(notification.getId()); NodeRef ref = getNodeRef(key); - - final NodeBuilder nodeData = new NodeBuilder(); + + final NodeBuilder nodeData = new NodeBuilder(); nodeData.setKey(key); - + NodeMeterFeaturesBuilder nodeMeterFeatures= new NodeMeterFeaturesBuilder(); nodeMeterFeatures.setMeterFeatures(meterFeature.build()); - + //Update augmented data nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build()); - + InstanceIdentifier refValue = ref.getValue(); it.putOperationalData(refValue, nodeData.build()); it.commit(); } - + @Override public void onGroupFeaturesUpdated(GroupFeaturesUpdated notification) { - //Add statistics to local cache - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); - if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); - } - GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder(); groupFeatures.setActions(notification.getActions()); groupFeatures.setGroupCapabilitiesSupported(notification.getGroupCapabilitiesSupported()); groupFeatures.setGroupTypesSupported(notification.getGroupTypesSupported()); groupFeatures.setMaxGroups(notification.getMaxGroups()); - cache.get(notification.getId()).setGroupFeatures(groupFeatures.build()); - + //Publish data to configuration data store DataModificationTransaction it = this.statisticsManager.startChange(); NodeKey key = new NodeKey(notification.getId()); NodeRef ref = getNodeRef(key); - - final NodeBuilder nodeData = new NodeBuilder(); + + final NodeBuilder nodeData = new NodeBuilder(); nodeData.setKey(key); - + NodeGroupFeaturesBuilder nodeGroupFeatures= new NodeGroupFeaturesBuilder(); nodeGroupFeatures.setGroupFeatures(groupFeatures.build()); - + //Update augmented data nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build()); - + InstanceIdentifier refValue = ref.getValue(); it.putOperationalData(refValue, nodeData.build()); it.commit(); @@ -376,17 +348,17 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList @Override public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) { - + //Check if response is for the request statistics-manager sent. if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) return; NodeKey key = new NodeKey(notification.getId()); sucLogger.debug("Received flow stats update : {}",notification.toString()); - + for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){ short tableId = map.getTableId(); - + DataModificationTransaction it = this.statisticsManager.startChange(); boolean foundOriginalFlow = false; @@ -416,26 +388,25 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList flow.setPriority(map.getPriority()); flow.setStrict(map.isStrict()); flow.setTableId(tableId); - + Flow flowRule = flow.build(); - + FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder(); stats.setByteCount(map.getByteCount()); stats.setPacketCount(map.getPacketCount()); stats.setDuration(map.getDuration()); - + GenericStatistics flowStats = stats.build(); - + //Add statistics to local cache - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); + ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); + cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key)); } - if(!cache.get(notification.getId()).getFlowAndStatsMap().containsKey(tableId)){ - cache.get(notification.getId()).getFlowAndStatsMap().put(tableId, new HashMap()); - } - cache.get(notification.getId()).getFlowAndStatsMap().get(tableId).put(flowRule,flowStats); - + NodeStatisticsAger nsa = cache.get(notification.getId()); + FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flowRule); + cache.get(notification.getId()).updateFlowStats(flowStatsEntry); + //Augment the data to the flow node FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder(); @@ -460,17 +431,17 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList flowStatistics.setTableId(tableId); flowStatisticsData.setFlowStatistics(flowStatistics.build()); - + sucLogger.debug("Flow : {}",flowRule.toString()); sucLogger.debug("Statistics to augment : {}",flowStatistics.build().toString()); InstanceIdentifier tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key) .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance(); - + Table table= (Table)it.readConfigurationData(tableRef); //TODO: Not a good way to do it, need to figure out better way. - //TODO: major issue in any alternate approach is that flow key is incrementally assigned + //TODO: major issue in any alternate approach is that flow key is incrementally assigned //to the flows stored in data store. if(table != null){ @@ -483,7 +454,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList .child(Flow.class,existingFlow.getKey()).toInstance(); flowBuilder.setKey(existingFlow.getKey()); flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); - sucLogger.trace("Found matching flow in the datastore, augmenting statistics"); + sucLogger.info("Found matching flow in the datastore, augmenting statistics"); foundOriginalFlow = true; it.putOperationalData(flowRef, flowBuilder.build()); it.commit(); @@ -491,11 +462,36 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList } } } + + table= (Table)it.readOperationalData(tableRef); + if(!foundOriginalFlow && table != null){ + for(Flow existingFlow : table.getFlow()){ + FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class); + if(augmentedflowStatisticsData != null){ + FlowBuilder existingOperationalFlow = new FlowBuilder(); + existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics()); + sucLogger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString()); + if(flowEquals(flowRule,existingOperationalFlow.build())){ + InstanceIdentifier flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key) + .augmentation(FlowCapableNode.class) + .child(Table.class, new TableKey(tableId)) + .child(Flow.class,existingFlow.getKey()).toInstance(); + flowBuilder.setKey(existingFlow.getKey()); + flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); + sucLogger.debug("Found matching flow in the operational datastore, augmenting statistics"); + foundOriginalFlow = true; + it.putOperationalData(flowRef, flowBuilder.build()); + it.commit(); + break; + } + } + } + } if(!foundOriginalFlow){ sucLogger.debug("Associated original flow is not found in data store. Augmenting flow in operational data store"); - //TODO: Temporary fix: format [ 1+tableid+1+unaccounted flow counter] - long flowKey = Long.parseLong(new String("1"+Short.toString(tableId)+"1"+Integer.toString(this.unaccountedFlowsCounter))); + long flowKey = Long.parseLong(new String("1"+Short.toString(tableId)+"0"+Integer.toString(this.unaccountedFlowsCounter))); + this.unaccountedFlowsCounter++; FlowKey newFlowKey = new FlowKey(new FlowId(Long.toString(flowKey))); InstanceIdentifier flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key) .augmentation(FlowCapableNode.class) @@ -503,7 +499,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList .child(Flow.class,newFlowKey).toInstance(); flowBuilder.setKey(newFlowKey); flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); - sucLogger.trace("Flow was no present in data store, augmenting statistics as an unaccounted flow"); + sucLogger.info("Flow was no present in data store, augmenting statistics as an unaccounted flow"); it.putOperationalData(flowRef, flowBuilder.build()); it.commit(); } @@ -518,10 +514,10 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList NodeKey key = new NodeKey(notification.getId()); sucLogger.debug("Received aggregate flow statistics update : {}",notification.toString()); - + Short tableId = this.statisticsManager.getMultipartMessageManager().getTableIdForTxId(notification.getTransactionId()); if(tableId != null){ - + DataModificationTransaction it = this.statisticsManager.startChange(); InstanceIdentifier
tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key) @@ -533,13 +529,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList aggregateFlowStatisticsBuilder.setFlowCount(notification.getFlowCount()); aggregateFlowStatisticsBuilder.setPacketCount(notification.getPacketCount()); aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build()); - - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); - if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); - } - cache.get(notification.getId()).getTableAndAggregateFlowStatsMap().put(tableId,aggregateFlowStatisticsBuilder.build()); - + sucLogger.debug("Augment aggregate statistics: {} for table {} on Node {}",aggregateFlowStatisticsBuilder.build().toString(),tableId,key); TableBuilder tableBuilder = new TableBuilder(); @@ -559,20 +549,13 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList NodeKey key = new NodeKey(notification.getId()); sucLogger.debug("Received port stats update : {}",notification.toString()); - - //Add statistics to local cache - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); - if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); - } - - + List portsStats = notification.getNodeConnectorStatisticsAndPortNumberMap(); for(NodeConnectorStatisticsAndPortNumberMap portStats : portsStats){ - + DataModificationTransaction it = this.statisticsManager.startChange(); - FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder + FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder = new FlowCapableNodeConnectorStatisticsBuilder(); statisticsBuilder.setBytes(portStats.getBytes()); statisticsBuilder.setCollisionCount(portStats.getCollisionCount()); @@ -585,20 +568,17 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError()); statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops()); statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors()); - - //Update data in the cache - cache.get(notification.getId()).getNodeConnectorStats().put(portStats.getNodeConnectorId(), statisticsBuilder.build()); - + //Augment data to the node-connector - FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder = + FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder = new FlowCapableNodeConnectorStatisticsDataBuilder(); - + statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build()); - + InstanceIdentifier nodeConnectorRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance(); - + NodeConnector nodeConnector = (NodeConnector)it.readOperationalData(nodeConnectorRef); - + if(nodeConnector != null){ sucLogger.debug("Augmenting port statistics {} to port {}",statisticsDataBuilder.build().toString(),nodeConnectorRef.toString()); NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder(); @@ -617,32 +597,26 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList NodeKey key = new NodeKey(notification.getId()); sucLogger.debug("Received flow table statistics update : {}",notification.toString()); - + List flowTablesStatsList = notification.getFlowTableAndStatisticsMap(); for (FlowTableAndStatisticsMap ftStats : flowTablesStatsList){ - + DataModificationTransaction it = this.statisticsManager.startChange(); InstanceIdentifier
tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key) .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance(); - + FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder(); - + FlowTableStatisticsBuilder statisticsBuilder = new FlowTableStatisticsBuilder(); statisticsBuilder.setActiveFlows(ftStats.getActiveFlows()); statisticsBuilder.setPacketsLookedUp(ftStats.getPacketsLookedUp()); statisticsBuilder.setPacketsMatched(ftStats.getPacketsMatched()); - + statisticsDataBuilder.setFlowTableStatistics(statisticsBuilder.build()); - - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); - if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); - } - cache.get(notification.getId()).getFlowTableAndStatisticsMap().put(ftStats.getTableId().getValue(),statisticsBuilder.build()); - + sucLogger.debug("Augment flow table statistics: {} for table {} on Node {}",statisticsBuilder.build().toString(),ftStats.getTableId(),key); - + TableBuilder tableBuilder = new TableBuilder(); tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue())); tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build()); @@ -653,70 +627,66 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList @Override public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) { - + //Check if response is for the request statistics-manager sent. if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null) return; NodeKey key = new NodeKey(notification.getId()); sucLogger.debug("Received queue stats update : {}",notification.toString()); - + //Add statistics to local cache - ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); + ConcurrentMap cache = this.statisticsManager.getStatisticsCache(); if(!cache.containsKey(notification.getId())){ - cache.put(notification.getId(), new NodeStatistics()); + cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key)); } - + + NodeStatisticsAger nsa = cache.get(notification.getId()); + List queuesStats = notification.getQueueIdAndStatisticsMap(); for(QueueIdAndStatisticsMap swQueueStats : queuesStats){ - - if(!cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap().containsKey(swQueueStats.getNodeConnectorId())){ - cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap().put(swQueueStats.getNodeConnectorId(), new HashMap()); - } - + + QueueEntry queueEntry = nsa.new QueueEntry(swQueueStats.getNodeConnectorId(),swQueueStats.getQueueId()); + nsa.updateQueueStats(queueEntry); + FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder(); - + FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder(); - + queueStatisticsBuilder.fieldsFrom(swQueueStats); - + queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build()); - - cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap() - .get(swQueueStats.getNodeConnectorId()) - .put(swQueueStats.getQueueId(), queueStatisticsBuilder.build()); - - + DataModificationTransaction it = this.statisticsManager.startChange(); - InstanceIdentifier queueRef + InstanceIdentifier queueRef = InstanceIdentifier.builder(Nodes.class) .child(Node.class, key) .child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId())) .augmentation(FlowCapableNodeConnector.class) .child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance(); - + QueueBuilder queueBuilder = new QueueBuilder(); queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, queueStatisticsDataBuilder.build()); queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId())); - sucLogger.trace("Augmenting queue statistics {} of queue {} to port {}" + sucLogger.info("Augmenting queue statistics {} of queue {} to port {}" ,queueStatisticsDataBuilder.build().toString(), swQueueStats.getQueueId(), swQueueStats.getNodeConnectorId()); - + it.putOperationalData(queueRef, queueBuilder.build()); it.commit(); - + } - + } private NodeRef getNodeRef(NodeKey nodeKey){ InstanceIdentifierBuilder builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey); return new NodeRef(builder.toInstance()); } - + public boolean flowEquals(Flow statsFlow, Flow storedFlow) { if (statsFlow.getClass() != storedFlow.getClass()) { return false; @@ -780,28 +750,28 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList } return true; } - + /** * Explicit equals method to compare the 'match' for flows stored in the data-stores and flow fetched from the switch. - * Usecase: e.g If user don't set any ethernet source and destination address for match,data store will store null for + * Usecase: e.g If user don't set any ethernet source and destination address for match,data store will store null for * these address. * e.g [_ethernetMatch=EthernetMatch [_ethernetDestination=null, _ethernetSource=null, _ethernetType= * EthernetType [_type=EtherType [_value=2048], _mask=null, augmentation=[]] - * - * But when you fetch the flows from switch, openflow driver library converts all zero bytes of mac address in the - * message stream to 00:00:00:00:00:00. Following string shows how library interpret the zero mac address bytes and - * eventually when translator convert it to MD-SAL match, this is how it looks - * [_ethernetDestination=EthernetDestination [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]], - * _ethernetSource=EthernetSource [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]], + * + * But when you fetch the flows from switch, openflow driver library converts all zero bytes of mac address in the + * message stream to 00:00:00:00:00:00. Following string shows how library interpret the zero mac address bytes and + * eventually when translator convert it to MD-SAL match, this is how it looks + * [_ethernetDestination=EthernetDestination [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]], + * _ethernetSource=EthernetSource [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]], * _ethernetType=EthernetType [_type=EtherType [_value=2048], _mask=null, augmentation=[]] - * - * Similarly for inPort, if user/application don't set any value for it, FRM will store null value for it in data store. + * + * Similarly for inPort, if user/application don't set any value for it, FRM will store null value for it in data store. * When we fetch the same flow (with its statistics) from switch, plugin converts its value to openflow:X:0. - * e.g _inPort=Uri [_value=openflow:1:0] - * + * e.g _inPort=Uri [_value=openflow:1:0] + * * So this custom equals method add additional check to take care of these scenario, in case any match element is null in data-store-flow, but not * in the flow fetched from switch. - * + * * @param statsFlow * @param storedFlow * @return diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java new file mode 100644 index 0000000000..f04c29fdd2 --- /dev/null +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateHandler.java @@ -0,0 +1,156 @@ +/* + * Copyright IBM Corporation, 2013. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.statistics.manager; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; +import org.opendaylight.controller.sal.binding.api.data.DataChangeListener; +import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics; +import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Following are two main responsibilities of the class + * 1) Listen for the create changes in config data store for tree nodes (Flow,Group,Meter,Queue) + * and send statistics request to the switch to fetch the statistics + * + * 2)Listen for the remove changes in config data store for tree nodes (Flow,Group,Meter,Queue) + * and remove the relative statistics data from operational data store. + * + * @author avishnoi@in.ibm.com + * + */ +public class StatisticsUpdateHandler implements DataChangeListener { + + public final static Logger suhLogger = LoggerFactory.getLogger(StatisticsUpdateHandler.class); + + private final StatisticsProvider statisticsManager; + + public StatisticsUpdateHandler(final StatisticsProvider manager){ + + this.statisticsManager = manager; + } + + public StatisticsProvider getStatisticsManager(){ + return statisticsManager; + } + + @SuppressWarnings("unchecked") + @Override + public void onDataChanged(DataChangeEvent, DataObject> change) { + + Map, DataObject> additions = change.getCreatedConfigurationData(); + for (InstanceIdentifier dataObjectInstance : additions.keySet()) { + DataObject dataObject = additions.get(dataObjectInstance); + InstanceIdentifier nodeII = dataObjectInstance.firstIdentifierOf(Node.class); + NodeRef nodeRef = new NodeRef(nodeII); + if(dataObject instanceof Flow){ + Flow flow = (Flow) dataObject; + try { + this.statisticsManager.sendFlowStatsFromTableRequest(nodeRef, flow); + } catch (InterruptedException | ExecutionException e) { + suhLogger.warn("Following exception occured while sending flow statistics request newly added flow: {}", e); + } + } + if(dataObject instanceof Meter){ + try { + this.statisticsManager.sendMeterConfigStatisticsRequest(nodeRef); + } catch (InterruptedException | ExecutionException e) { + suhLogger.warn("Following exception occured while sending meter statistics request for newly added meter: {}", e); + } + } + if(dataObject instanceof Group){ + try { + this.statisticsManager.sendGroupDescriptionRequest(nodeRef); + } catch (InterruptedException | ExecutionException e) { + suhLogger.warn("Following exception occured while sending group description request for newly added group: {}", e); + } + } + if(dataObject instanceof Queue){ + Queue queue = (Queue) dataObject; + InstanceIdentifier nodeConnectorII = dataObjectInstance.firstIdentifierOf(NodeConnector.class); + NodeConnectorKey nodeConnectorKey = InstanceIdentifier.keyOf(nodeConnectorII); + try { + this.statisticsManager.sendQueueStatsFromGivenNodeConnector(nodeRef, nodeConnectorKey.getId(), queue.getQueueId()); + } catch (InterruptedException | ExecutionException e) { + suhLogger.warn("Following exception occured while sending queue statistics request for newly added group: {}", e); + } + } + } + + Set> removals = change.getRemovedConfigurationData(); + for (InstanceIdentifier dataObjectInstance : removals) { + DataObject dataObject = change.getOriginalConfigurationData().get(dataObjectInstance); + + if(dataObject instanceof Flow){ + InstanceIdentifier flowII = (InstanceIdentifier)dataObjectInstance; + InstanceIdentifier flowAugmentation = + InstanceIdentifier.builder(flowII).augmentation(FlowStatisticsData.class).toInstance(); + removeAugmentedOperationalData(flowAugmentation); + } + if(dataObject instanceof Meter){ + InstanceIdentifier meterII = (InstanceIdentifier)dataObjectInstance; + + InstanceIdentifier nodeMeterConfigStatsAugmentation = + InstanceIdentifier.builder(meterII).augmentation(NodeMeterConfigStats.class).toInstance(); + removeAugmentedOperationalData(nodeMeterConfigStatsAugmentation); + + InstanceIdentifier nodeMeterStatisticsAugmentation = + InstanceIdentifier.builder(meterII).augmentation(NodeMeterStatistics.class).toInstance(); + removeAugmentedOperationalData(nodeMeterStatisticsAugmentation); + } + + if(dataObject instanceof Group){ + InstanceIdentifier groupII = (InstanceIdentifier)dataObjectInstance; + + InstanceIdentifier nodeGroupDescStatsAugmentation = + InstanceIdentifier.builder(groupII).augmentation(NodeGroupDescStats.class).toInstance(); + removeAugmentedOperationalData(nodeGroupDescStatsAugmentation); + + InstanceIdentifier nodeGroupStatisticsAugmentation = + InstanceIdentifier.builder(groupII).augmentation(NodeGroupStatistics.class).toInstance(); + removeAugmentedOperationalData(nodeGroupStatisticsAugmentation); + } + + if(dataObject instanceof Queue){ + InstanceIdentifier queueII = (InstanceIdentifier)dataObjectInstance; + + InstanceIdentifier nodeConnectorQueueStatisticsDataAugmentation = + InstanceIdentifier.builder(queueII).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance(); + removeAugmentedOperationalData(nodeConnectorQueueStatisticsDataAugmentation); + } + } + } + + private void removeAugmentedOperationalData(InstanceIdentifier dataObjectInstance ){ + if(dataObjectInstance != null){ + DataModificationTransaction it = this.statisticsManager.startChange(); + it.removeOperationalData(dataObjectInstance); + it.commit(); + } + } +} -- 2.36.6