1) Fix for bug 284. Added functionality that listen to data store *remove* changes and
cleanup the relevant statistics from operational data store.
2) Added functionality that listen to config data store *create* changes for Flow,Group,
Meter & Queue and send statistics request to switch to get respective stats.
3) Added functionality to periodically remove stale stats from operational data store.
Clean up thread invokes after every two cycle of stats collection.
4) Removed unnecessary local caching.
Change-Id: Ibee3c73905ce872302c4f54ce5b7b53c0657ee51
Signed-off-by: Anil Vishnoi <avishnoi@in.ibm.com>
+++ /dev/null
-/*
- * Copyright IBM Corporation, 2013. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.statistics.manager;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.MeterFeatures;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericQueueStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericTableStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.NodeConnectorStatistics;
-
-public class NodeStatistics {
-
- private NodeRef targetNode;
-
- private List<GroupStats> groupStatistics;
-
- private List<MeterStats> meterStatistics;
-
- private List<GroupDescStats> groupDescStats;
-
- private List<MeterConfigStats> meterConfigStats;
-
- private GroupFeatures groupFeatures;
-
- private MeterFeatures meterFeatures;
-
- private final Map<Short,Map<Flow,GenericStatistics>> flowAndStatsMap=
- new HashMap<Short,Map<Flow,GenericStatistics>>();
-
- private final Map<Short,AggregateFlowStatistics> tableAndAggregateFlowStatsMap =
- new HashMap<Short,AggregateFlowStatistics>();
-
- private final Map<NodeConnectorId,NodeConnectorStatistics> nodeConnectorStats =
- new ConcurrentHashMap<NodeConnectorId,NodeConnectorStatistics>();
-
- private final Map<Short,GenericTableStatistics> flowTableAndStatisticsMap =
- new HashMap<Short,GenericTableStatistics>();
-
- private final Map<NodeConnectorId,Map<QueueId,GenericQueueStatistics>> NodeConnectorAndQueuesStatsMap =
- new HashMap<NodeConnectorId,Map<QueueId,GenericQueueStatistics>>();
-
- public NodeStatistics(){
-
- }
-
- public NodeRef getTargetNode() {
- return targetNode;
- }
-
- public void setTargetNode(NodeRef targetNode) {
- this.targetNode = targetNode;
- }
-
- public List<GroupStats> getGroupStatistics() {
- return groupStatistics;
- }
-
- public void setGroupStatistics(List<GroupStats> groupStatistics) {
- this.groupStatistics = groupStatistics;
- }
-
- public List<MeterStats> getMeterStatistics() {
- return meterStatistics;
- }
-
- public void setMeterStatistics(List<MeterStats> meterStatistics) {
- this.meterStatistics = meterStatistics;
- }
-
- public List<GroupDescStats> getGroupDescStats() {
- return groupDescStats;
- }
-
- public void setGroupDescStats(List<GroupDescStats> groupDescStats) {
- this.groupDescStats = groupDescStats;
- }
-
- public List<MeterConfigStats> getMeterConfigStats() {
- return meterConfigStats;
- }
-
- public void setMeterConfigStats(List<MeterConfigStats> meterConfigStats) {
- this.meterConfigStats = meterConfigStats;
- }
-
- public GroupFeatures getGroupFeatures() {
- return groupFeatures;
- }
-
- public void setGroupFeatures(GroupFeatures groupFeatures) {
- this.groupFeatures = groupFeatures;
- }
-
- public MeterFeatures getMeterFeatures() {
- return meterFeatures;
- }
-
- public void setMeterFeatures(MeterFeatures meterFeatures) {
- this.meterFeatures = meterFeatures;
- }
-
- public Map<Short,Map<Flow,GenericStatistics>> getFlowAndStatsMap() {
- return flowAndStatsMap;
- }
-
- public Map<Short, GenericTableStatistics> getFlowTableAndStatisticsMap() {
- return flowTableAndStatisticsMap;
- }
-
- public Map<Short, AggregateFlowStatistics> getTableAndAggregateFlowStatsMap() {
- return tableAndAggregateFlowStatsMap;
- }
- public Map<NodeConnectorId, NodeConnectorStatistics> getNodeConnectorStats() {
- return nodeConnectorStats;
- }
-
- public Map<NodeConnectorId, Map<QueueId, GenericQueueStatistics>> getNodeConnectorAndQueuesStatsMap() {
- return NodeConnectorAndQueuesStatsMap;
- }
-}
--- /dev/null
+/*
+ * Copyright IBM Corporation, 2013. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
+
+/**
+ * Main responsibility of this class to clean up all the stale statistics data
+ * associated to Flow,Meter,Group,Queue.
+ * @author avishnoi@in.ibm.com
+ *
+ */
+public class NodeStatisticsAger {
+
+ private final int NUMBER_OF_WAIT_CYCLES =2;
+
+ private final StatisticsProvider statisticsProvider;
+
+ private final NodeKey targetNodeKey;
+
+ private final Map<GroupDescStats,Date> groupDescStatsUpdate
+ = new ConcurrentHashMap<GroupDescStats,Date>();
+
+ private final Map<MeterConfigStats,Date> meterConfigStatsUpdate
+ = new ConcurrentHashMap<MeterConfigStats,Date>();
+
+ private final Map<FlowEntry,Date> flowStatsUpdate
+ = new ConcurrentHashMap<FlowEntry,Date>();
+
+ private final Map<QueueEntry,Date> queuesStatsUpdate
+ = new ConcurrentHashMap<QueueEntry,Date>();
+
+ public NodeStatisticsAger(StatisticsProvider statisticsProvider, NodeKey nodeKey){
+ this.targetNodeKey = nodeKey;
+ this.statisticsProvider = statisticsProvider;
+ }
+
+ public class FlowEntry{
+ private final Short tableId;
+ private final Flow flow;
+
+ public FlowEntry(Short tableId, Flow flow){
+ this.tableId = tableId;
+ this.flow = flow;
+ }
+
+ public Short getTableId() {
+ return tableId;
+ }
+
+ public Flow getFlow() {
+ return flow;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result + ((flow == null) ? 0 : flow.hashCode());
+ result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ FlowEntry other = (FlowEntry) obj;
+ if (!getOuterType().equals(other.getOuterType()))
+ return false;
+ if (flow == null) {
+ if (other.flow != null)
+ return false;
+ } else if (!flow.equals(other.flow))
+ return false;
+ if (tableId == null) {
+ if (other.tableId != null)
+ return false;
+ } else if (!tableId.equals(other.tableId))
+ return false;
+ return true;
+ }
+
+ private NodeStatisticsAger getOuterType() {
+ return NodeStatisticsAger.this;
+ }
+
+ }
+
+ public class QueueEntry{
+ private final NodeConnectorId nodeConnectorId;
+ private final QueueId queueId;
+ public QueueEntry(NodeConnectorId ncId, QueueId queueId){
+ this.nodeConnectorId = ncId;
+ this.queueId = queueId;
+ }
+ public NodeConnectorId getNodeConnectorId() {
+ return nodeConnectorId;
+ }
+ public QueueId getQueueId() {
+ return queueId;
+ }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result + ((nodeConnectorId == null) ? 0 : nodeConnectorId.hashCode());
+ result = prime * result + ((queueId == null) ? 0 : queueId.hashCode());
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof QueueEntry)) {
+ return false;
+ }
+ QueueEntry other = (QueueEntry) obj;
+ if (!getOuterType().equals(other.getOuterType())) {
+ return false;
+ }
+ if (nodeConnectorId == null) {
+ if (other.nodeConnectorId != null) {
+ return false;
+ }
+ } else if (!nodeConnectorId.equals(other.nodeConnectorId)) {
+ return false;
+ }
+ if (queueId == null) {
+ if (other.queueId != null) {
+ return false;
+ }
+ } else if (!queueId.equals(other.queueId)) {
+ return false;
+ }
+ return true;
+ }
+ private NodeStatisticsAger getOuterType() {
+ return NodeStatisticsAger.this;
+ }
+ }
+
+ public NodeKey getTargetNodeKey() {
+ return targetNodeKey;
+ }
+
+ public Map<GroupDescStats, Date> getGroupDescStatsUpdate() {
+ return groupDescStatsUpdate;
+ }
+
+ public Map<MeterConfigStats, Date> getMeterConfigStatsUpdate() {
+ return meterConfigStatsUpdate;
+ }
+
+ public Map<FlowEntry, Date> getFlowStatsUpdate() {
+ return flowStatsUpdate;
+ }
+
+ public Map<QueueEntry, Date> getQueuesStatsUpdate() {
+ return queuesStatsUpdate;
+ }
+
+ public void updateGroupDescStats(List<GroupDescStats> list){
+ Date expiryTime = getExpiryTime();
+ for(GroupDescStats groupDescStats : list)
+ this.groupDescStatsUpdate.put(groupDescStats, expiryTime);
+ }
+
+ public void updateMeterConfigStats(List<MeterConfigStats> list){
+ Date expiryTime = getExpiryTime();
+ for(MeterConfigStats meterConfigStats: list)
+ this.meterConfigStatsUpdate.put(meterConfigStats, expiryTime);
+ }
+
+ public void updateFlowStats(FlowEntry flowEntry){
+ this.flowStatsUpdate.put(flowEntry, getExpiryTime());
+ }
+ public void updateQueueStats(QueueEntry queueEntry){
+ this.queuesStatsUpdate.put(queueEntry, getExpiryTime());
+ }
+
+ private Date getExpiryTime(){
+ Date expires = new Date();
+ expires.setTime(expires.getTime()+StatisticsProvider.STATS_THREAD_EXECUTION_TIME*NUMBER_OF_WAIT_CYCLES);
+ return expires;
+ }
+
+ public void cleanStaleStatistics(){
+ //Clean stale statistics related to group
+ for (Iterator<GroupDescStats> it = this.groupDescStatsUpdate.keySet().iterator();it.hasNext();){
+ GroupDescStats groupDescStats = it.next();
+ Date now = new Date();
+ Date expiryTime = this.groupDescStatsUpdate.get(groupDescStats);
+ if(now.after(expiryTime)){
+ cleanGroupStatsFromDataStore(groupDescStats );
+ it.remove();
+ }
+ }
+
+ //Clean stale statistics related to meter
+ for (Iterator<MeterConfigStats> it = this.meterConfigStatsUpdate.keySet().iterator();it.hasNext();){
+ MeterConfigStats meterConfigStats = it.next();
+ Date now = new Date();
+ Date expiryTime = this.meterConfigStatsUpdate.get(meterConfigStats);
+ if(now.after(expiryTime)){
+ cleanMeterStatsFromDataStore(meterConfigStats);
+ it.remove();
+ }
+ }
+
+ //Clean stale statistics related to flow
+ for (Iterator<FlowEntry> it = this.flowStatsUpdate.keySet().iterator();it.hasNext();){
+ FlowEntry flowEntry = it.next();
+ Date now = new Date();
+ Date expiryTime = this.flowStatsUpdate.get(flowEntry);
+ if(now.after(expiryTime)){
+ cleanFlowStatsFromDataStore(flowEntry);
+ it.remove();
+ }
+ }
+
+ //Clean stale statistics related to queue
+ for (Iterator<QueueEntry> it = this.queuesStatsUpdate.keySet().iterator();it.hasNext();){
+ QueueEntry queueEntry = it.next();
+ Date now = new Date();
+ Date expiryTime = this.queuesStatsUpdate.get(queueEntry);
+ if(now.after(expiryTime)){
+ cleanQueueStatsFromDataStore(queueEntry);
+ it.remove();
+ }
+ }
+
+ }
+
+ private void cleanQueueStatsFromDataStore(QueueEntry queueEntry) {
+ InstanceIdentifier<?> queueRef
+ = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class, this.targetNodeKey)
+ .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
+ .augmentation(FlowCapableNodeConnector.class)
+ .child(Queue.class, new QueueKey(queueEntry.getQueueId()))
+ .augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
+ cleanStaleStatisticsFromDataStore(queueRef);
+ }
+
+ private void cleanFlowStatsFromDataStore(FlowEntry flowEntry) {
+ InstanceIdentifier<?> flowRef
+ = InstanceIdentifier.builder(Nodes.class).child(Node.class, this.targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(flowEntry.getTableId()))
+ .child(Flow.class,flowEntry.getFlow().getKey())
+ .augmentation(FlowStatisticsData.class).toInstance();
+
+ cleanStaleStatisticsFromDataStore(flowRef);
+
+ }
+
+ private void cleanMeterStatsFromDataStore(MeterConfigStats meterConfigStats) {
+ InstanceIdentifierBuilder<Meter> meterRef
+ = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Meter.class,new MeterKey(meterConfigStats.getMeterId()));
+
+ InstanceIdentifier<?> nodeMeterConfigStatsAugmentation = meterRef.augmentation(NodeMeterConfigStats.class).toInstance();
+
+ cleanStaleStatisticsFromDataStore(nodeMeterConfigStatsAugmentation);
+
+ InstanceIdentifier<?> nodeMeterStatisticsAugmentation = meterRef.augmentation(NodeMeterStatistics.class).toInstance();
+
+ cleanStaleStatisticsFromDataStore(nodeMeterStatisticsAugmentation);
+
+ }
+
+ private void cleanGroupStatsFromDataStore(GroupDescStats groupDescStats) {
+ InstanceIdentifierBuilder<Group> groupRef
+ = InstanceIdentifier.builder(Nodes.class).child(Node.class,this.targetNodeKey)
+ .augmentation(FlowCapableNode.class)
+ .child(Group.class,new GroupKey(groupDescStats.getGroupId()));
+
+ InstanceIdentifier<?> nodeGroupDescStatsAugmentation = groupRef.augmentation(NodeGroupDescStats.class).toInstance();
+
+ cleanStaleStatisticsFromDataStore(nodeGroupDescStatsAugmentation);
+
+ InstanceIdentifier<?> nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance();
+
+ cleanStaleStatisticsFromDataStore(nodeGroupStatisticsAugmentation);
+ }
+
+ private void cleanStaleStatisticsFromDataStore(InstanceIdentifier<? extends DataObject> ii){
+ if(ii != null){
+ DataModificationTransaction it = this.statisticsProvider.startChange();
+ it.removeOperationalData(ii);
+ it.commit();
+ }
+ }
+}
import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.osgi.framework.BundleContext;
pSession = session;
DataProviderService dps = session.<DataProviderService>getSALService(DataProviderService.class);
StatisticsManagerActivator.statsProvider.setDataService(dps);
+ DataBrokerService dbs = session.<DataBrokerService>getSALService(DataBrokerService.class);
+ StatisticsManagerActivator.statsProvider.setDataBrokerService(dbs);
NotificationProviderService nps = session.<NotificationProviderService>getSALService(NotificationProviderService.class);
StatisticsManagerActivator.statsProvider.setNotificationService(nps);
StatisticsManagerActivator.statsProvider.start();
import org.eclipse.xtext.xbase.lib.Exceptions;
import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.NotificationListener;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Following are main responsibilities of the class:
+ * 1) Invoke statistics request thread to send periodic statistics request to all the
+ * flow capable switch connected to the controller. It sends statistics request for
+ * Group,Meter,Table,Flow,Queue,Aggregate stats.
+ *
+ * 2) Invoke statistics ager thread, to clean up all the stale statistics data from
+ * operational data store.
+ *
+ * @author avishnoi@in.ibm.com
+ *
+ */
public class StatisticsProvider implements AutoCloseable {
public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
-
+
private DataProviderService dps;
+
+ private DataBrokerService dbs;
private NotificationProviderService nps;
-
+
private OpendaylightGroupStatisticsService groupStatsService;
-
+
private OpendaylightMeterStatisticsService meterStatsService;
-
+
private OpendaylightFlowStatisticsService flowStatsService;
-
+
private OpendaylightPortStatisticsService portStatsService;
private OpendaylightFlowTableStatisticsService flowTableStatsService;
private OpendaylightQueueStatisticsService queueStatsService;
private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
-
+
+ private StatisticsUpdateHandler statsUpdateHandler;
+
private Thread statisticsRequesterThread;
+
+ private Thread statisticsAgerThread;
private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
-
- private final int STATS_THREAD_EXECUTION_TIME= 50000;
+
+ public static final int STATS_THREAD_EXECUTION_TIME= 30000;
//Local caching of stats
-
- private final ConcurrentMap<NodeId,NodeStatistics> statisticsCache =
- new ConcurrentHashMap<NodeId,NodeStatistics>();
-
+
+ private final ConcurrentMap<NodeId,NodeStatisticsAger> statisticsCache =
+ new ConcurrentHashMap<NodeId,NodeStatisticsAger>();
+
public DataProviderService getDataService() {
return this.dps;
}
-
+
public void setDataService(final DataProviderService dataService) {
this.dps = dataService;
}
+
+ public DataBrokerService getDataBrokerService() {
+ return this.dbs;
+ }
+
+ public void setDataBrokerService(final DataBrokerService dataBrokerService) {
+ this.dbs = dataBrokerService;
+ }
public NotificationProviderService getNotificationService() {
return this.nps;
}
-
+
public void setNotificationService(final NotificationProviderService notificationService) {
this.nps = notificationService;
}
}
private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
-
+
private Registration<NotificationListener> listenerRegistration;
-
+
public void start() {
-
+
NotificationProviderService nps = this.getNotificationService();
Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
this.listenerRegistration = registerNotificationListener;
-
+
+ statsUpdateHandler = new StatisticsUpdateHandler(StatisticsProvider.this);
+
+ registerDataStoreUpdateListener(this.getDataBrokerService());
+
// Get Group/Meter statistics service instance
groupStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightGroupStatisticsService.class);
-
+
meterStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightMeterStatisticsService.class);
-
+
flowStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightFlowStatisticsService.class);
flowTableStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightFlowTableStatisticsService.class);
-
+
queueStatsService = StatisticsManagerActivator.getProviderContext().
getRpcService(OpendaylightQueueStatisticsService.class);
-
+
statisticsRequesterThread = new Thread( new Runnable(){
@Override
while(true){
try {
statsRequestSender();
-
+
Thread.sleep(STATS_THREAD_EXECUTION_TIME);
}catch (Exception e){
spLogger.error("Exception occurred while sending stats request : {}",e);
}
}
});
-
+
spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
-
+
statisticsRequesterThread.start();
+
+ statisticsAgerThread = new Thread( new Runnable(){
+
+ @Override
+ public void run() {
+ while(true){
+ try {
+ for(NodeStatisticsAger nodeStatisticsAger : statisticsCache.values()){
+ nodeStatisticsAger.cleanStaleStatistics();
+ }
+
+ Thread.sleep(STATS_THREAD_EXECUTION_TIME);
+ }catch (Exception e){
+ spLogger.error("Exception occurred while sending stats request : {}",e);
+ }
+ }
+ }
+ });
+
+ spLogger.debug("Statistics ager thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
+ statisticsAgerThread.start();
+
spLogger.info("Statistics Provider started.");
}
+
+ private void registerDataStoreUpdateListener(DataBrokerService dbs) {
+ //Register for flow updates
+ InstanceIdentifier<? extends DataObject> pathFlow = InstanceIdentifier.builder(Nodes.class).child(Node.class)
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class)
+ .child(Flow.class).toInstance();
+ dbs.registerDataChangeListener(pathFlow, statsUpdateHandler);
+
+ //Register for meter updates
+ InstanceIdentifier<? extends DataObject> pathMeter = InstanceIdentifier.builder(Nodes.class).child(Node.class)
+ .augmentation(FlowCapableNode.class)
+ .child(Meter.class).toInstance();
+
+ dbs.registerDataChangeListener(pathMeter, statsUpdateHandler);
+
+ //Register for group updates
+ InstanceIdentifier<? extends DataObject> pathGroup = InstanceIdentifier.builder(Nodes.class).child(Node.class)
+ .augmentation(FlowCapableNode.class)
+ .child(Group.class).toInstance();
+ dbs.registerDataChangeListener(pathGroup, statsUpdateHandler);
+
+ //Register for queue updates
+ InstanceIdentifier<? extends DataObject> pathQueue = InstanceIdentifier.builder(Nodes.class).child(Node.class)
+ .child(NodeConnector.class)
+ .augmentation(FlowCapableNodeConnector.class)
+ .child(Queue.class).toInstance();
+ dbs.registerDataChangeListener(pathQueue, statsUpdateHandler);
+ }
protected DataModificationTransaction startChange() {
-
+
DataProviderService dps = this.getDataService();
return dps.beginTransaction();
}
-
+
private void statsRequestSender(){
-
+
List<Node> targetNodes = getAllConnectedNodes();
-
+
if(targetNodes == null)
return;
-
+
for (Node targetNode : targetNodes){
-
+
if(targetNode.getAugmentation(FlowCapableNode.class) != null){
- spLogger.trace("Send request for stats collection to node : {})",targetNode.getId());
-
+ spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
+
InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
-
+
NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-
+
try{
sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
-
+
sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
-
+
sendAllFlowTablesStatisticsRequest(targetNodeRef);
-
+
sendAllQueueStatsFromAllNodeConnector (targetNodeRef);
sendAllGroupStatisticsRequest(targetNodeRef);
-
+
sendAllMeterStatisticsRequest(targetNodeRef);
-
+
sendGroupDescriptionRequest(targetNodeRef);
-
+
sendMeterConfigStatisticsRequest(targetNodeRef);
}catch(Exception e){
spLogger.error("Exception occured while sending statistics requests : {}", e);
}
}
- private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
- final GetFlowTablesStatisticsInputBuilder input =
+ public void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
+ final GetFlowTablesStatisticsInputBuilder input =
new GetFlowTablesStatisticsInputBuilder();
-
+
input.setNode(targetNodeRef);
- Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
+ Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
flowTableStatsService.getFlowTablesStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
}
- private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ public void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
-
+
input.setNode(targetNode);
-
- Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
+
+ Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
-
+
+ }
+
+ public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{
+ final GetFlowStatisticsFromFlowTableInputBuilder input =
+ new GetFlowStatisticsFromFlowTableInputBuilder();
+
+ input.setNode(targetNode);
+ input.fieldsFrom(flow);
+
+ Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
+ flowStatsService.getFlowStatisticsFromFlowTable(input.build());
+
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ , StatsRequestType.ALL_FLOW);
+
}
- private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
-
+ public void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
+
List<Short> tablesId = getTablesFromNode(targetNodeKey);
-
+
if(tablesId.size() != 0){
for(Short id : tablesId){
-
- spLogger.trace("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
- GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
+
+ spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
+ GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-
+
input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id));
- Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
+ Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
-
+
multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
, StatsRequestType.AGGR_FLOW);
}
}
- private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+ public void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+
final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
+ Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
portStatsService.getAllNodeConnectorsStatistics(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
, StatsRequestType.ALL_PORT);
}
- private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+ public void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+
final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllGroupStatisticsOutput>> response =
+ Future<RpcResult<GetAllGroupStatisticsOutput>> response =
groupStatsService.getAllGroupStatistics(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
, StatsRequestType.ALL_GROUP);
}
-
- private void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+
+ public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetGroupDescriptionOutput>> response =
+ Future<RpcResult<GetGroupDescriptionOutput>> response =
groupStatsService.getGroupDescription(input.build());
this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
, StatsRequestType.GROUP_DESC);
}
-
- private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
+ public void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+
GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllMeterStatisticsOutput>> response =
+ Future<RpcResult<GetAllMeterStatisticsOutput>> response =
meterStatsService.getAllMeterStatistics(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
, StatsRequestType.ALL_METER);;
}
-
- private void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
-
+
+ public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+
GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
-
+
input.setNode(targetNode);
- Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
+ Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
meterStatsService.getAllMeterConfigStatistics(input.build());
-
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
, StatsRequestType.METER_CONFIG);;
}
-
- private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
+
+ public void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
-
+
input.setNode(targetNode);
-
- Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
+
+ Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
+
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
+ , StatsRequestType.ALL_QUEUE_STATS);;
+ }
+
+ public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
+ GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
+
+ input.setNode(targetNode);
+ input.setNodeConnectorId(nodeConnectorId);
+ input.setQueueId(queueId);
+ Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
+ queueStatsService.getQueueStatisticsFromGivenPort(input.build());
+
this.multipartMessageManager.addTxIdToRequestTypeEntry(response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
- public ConcurrentMap<NodeId, NodeStatistics> getStatisticsCache() {
+ public ConcurrentMap<NodeId, NodeStatisticsAger> getStatisticsCache() {
return statisticsCache;
}
-
+
private List<Node> getAllConnectedNodes(){
-
+
Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
if(nodes == null)
return null;
-
- spLogger.trace("Number of connected nodes : {}",nodes.getNode().size());
+
+ spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
return nodes.getNode();
}
-
+
private List<Short> getTablesFromNode(NodeKey nodeKey){
InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
-
+
FlowCapableNode node = (FlowCapableNode)dps.readOperationalData(nodesIdentifier);
List<Short> tablesId = new ArrayList<Short>();
if(node != null && node.getTable()!=null){
- spLogger.trace("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
+ spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
for(Table table: node.getTable()){
tablesId.add(table.getId());
}
@SuppressWarnings("deprecation")
@Override
public void close(){
-
+
try {
- spLogger.trace("Statistics Provider stopped.");
+ spLogger.info("Statistics Provider stopped.");
if (this.listenerRegistration != null) {
-
+
this.listenerRegistration.close();
-
+
this.statisticsRequesterThread.destroy();
-
+
+ this.statisticsAgerThread.destroy();
+
}
} catch (Throwable e) {
throw Exceptions.sneakyThrow(e);
*/
package org.opendaylight.controller.md.statistics.manager;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
+import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.FlowEntry;
+import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.QueueEntry;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.QueueKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericQueueStatistics;
import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
import org.slf4j.LoggerFactory;
/**
- * Class implement statistics manager related listener interface and augment all the
+ * Class implement statistics manager related listener interface and augment all the
* received statistics data to data stores.
- * TODO: Need to add error message listener and clean-up the associated tx id
+ * TODO: Need to add error message listener and clean-up the associated tx id
* if it exists in the tx-id cache.
* @author vishnoianil
*
*/
public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener,
- OpendaylightMeterStatisticsListener,
+ OpendaylightMeterStatisticsListener,
OpendaylightFlowStatisticsListener,
OpendaylightPortStatisticsListener,
OpendaylightFlowTableStatisticsListener,
OpendaylightQueueStatisticsListener{
-
+
public final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
private final StatisticsProvider statisticsManager;
-
- private final int unaccountedFlowsCounter = 1;
+
+ private int unaccountedFlowsCounter = 1;
public StatisticsUpdateCommiter(final StatisticsProvider manager){
this.statisticsManager = manager;
}
-
+
public StatisticsProvider getStatisticsManager(){
return statisticsManager;
}
-
+
@Override
public void onMeterConfigStatsUpdated(MeterConfigStatsUpdated notification) {
//Check if response is for the request statistics-manager sent.
if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
return;
+
+ NodeKey key = new NodeKey(notification.getId());
//Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+ ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
+ cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
}
- cache.get(notification.getId()).setMeterConfigStats(notification.getMeterConfigStats());
-
+ cache.get(notification.getId()).updateMeterConfigStats(notification.getMeterConfigStats());
+
//Publish data to configuration data store
- NodeKey key = new NodeKey(notification.getId());
-
- List<MeterConfigStats> eterConfigStatsList = notification.getMeterConfigStats();
-
- for(MeterConfigStats meterConfigStats : eterConfigStatsList){
+ List<MeterConfigStats> meterConfigStatsList = notification.getMeterConfigStats();
+
+ for(MeterConfigStats meterConfigStats : meterConfigStatsList){
DataModificationTransaction it = this.statisticsManager.startChange();
MeterBuilder meterBuilder = new MeterBuilder();
MeterKey meterKey = new MeterKey(meterConfigStats.getMeterId());
meterBuilder.setKey(meterKey);
-
+
InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
.augmentation(FlowCapableNode.class)
.child(Meter.class,meterKey).toInstance();
-
+
NodeMeterConfigStatsBuilder meterConfig= new NodeMeterConfigStatsBuilder();
MeterConfigStatsBuilder stats = new MeterConfigStatsBuilder();
stats.fieldsFrom(meterConfigStats);
meterConfig.setMeterConfigStats(stats.build());
-
+
//Update augmented data
meterBuilder.addAugmentation(NodeMeterConfigStats.class, meterConfig.build());
it.putOperationalData(meterRef, meterBuilder.build());
@Override
public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
-
+
//Check if response is for the request statistics-manager sent.
if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
return;
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
- }
- cache.get(notification.getId()).setMeterStatistics(notification.getMeterStats());
-
NodeKey key = new NodeKey(notification.getId());
-
+
List<MeterStats> meterStatsList = notification.getMeterStats();
-
+
for(MeterStats meterStats : meterStatsList){
//Publish data to configuration data store
MeterBuilder meterBuilder = new MeterBuilder();
MeterKey meterKey = new MeterKey(meterStats.getMeterId());
meterBuilder.setKey(meterKey);
-
+
InstanceIdentifier<Meter> meterRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
.augmentation(FlowCapableNode.class)
.child(Meter.class,meterKey).toInstance();
-
+
NodeMeterStatisticsBuilder meterStatsBuilder= new NodeMeterStatisticsBuilder();
MeterStatisticsBuilder stats = new MeterStatisticsBuilder();
stats.fieldsFrom(meterStats);
@Override
public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
-
+
//Check if response is for the request statistics-manager sent.
if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
return;
+ NodeKey key = new NodeKey(notification.getId());
+
//Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+ ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
+ cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
}
- cache.get(notification.getId()).setGroupDescStats(notification.getGroupDescStats());
-
+ cache.get(notification.getId()).updateGroupDescStats(notification.getGroupDescStats());
+
//Publish data to configuration data store
- NodeKey key = new NodeKey(notification.getId());
List<GroupDescStats> groupDescStatsList = notification.getGroupDescStats();
for(GroupDescStats groupDescStats : groupDescStatsList){
DataModificationTransaction it = this.statisticsManager.startChange();
-
+
GroupBuilder groupBuilder = new GroupBuilder();
GroupKey groupKey = new GroupKey(groupDescStats.getGroupId());
groupBuilder.setKey(groupKey);
-
+
InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
.augmentation(FlowCapableNode.class)
.child(Group.class,groupKey).toInstance();
GroupDescBuilder stats = new GroupDescBuilder();
stats.fieldsFrom(groupDescStats);
groupDesc.setGroupDesc(stats.build());
-
+
//Update augmented data
groupBuilder.addAugmentation(NodeGroupDescStats.class, groupDesc.build());
@Override
public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
-
+
//Check if response is for the request statistics-manager sent.
if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
return;
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
- }
- cache.get(notification.getId()).setGroupStatistics(notification.getGroupStats());
-
//Publish data to configuration data store
NodeKey key = new NodeKey(notification.getId());
List<GroupStats> groupStatsList = notification.getGroupStats();
for(GroupStats groupStats : groupStatsList){
DataModificationTransaction it = this.statisticsManager.startChange();
-
+
GroupBuilder groupBuilder = new GroupBuilder();
GroupKey groupKey = new GroupKey(groupStats.getGroupId());
groupBuilder.setKey(groupKey);
-
+
InstanceIdentifier<Group> groupRef = InstanceIdentifier.builder(Nodes.class).child(Node.class,key)
.augmentation(FlowCapableNode.class)
.child(Group.class,groupKey).toInstance();
GroupStatisticsBuilder stats = new GroupStatisticsBuilder();
stats.fieldsFrom(groupStats);
groupStatisticsBuilder.setGroupStatistics(stats.build());
-
+
//Update augmented data
groupBuilder.addAugmentation(NodeGroupStatistics.class, groupStatisticsBuilder.build());
it.putOperationalData(groupRef, groupBuilder.build());
it.commit();
}
}
-
+
@Override
public void onMeterFeaturesUpdated(MeterFeaturesUpdated notification) {
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
- }
MeterFeaturesBuilder meterFeature = new MeterFeaturesBuilder();
meterFeature.setMeterBandSupported(notification.getMeterBandSupported());
meterFeature.setMeterCapabilitiesSupported(notification.getMeterCapabilitiesSupported());
meterFeature.setMaxBands(notification.getMaxBands());
meterFeature.setMaxColor(notification.getMaxColor());
meterFeature.setMaxMeter(notification.getMaxMeter());
-
- cache.get(notification.getId()).setMeterFeatures(meterFeature.build());
-
+
//Publish data to configuration data store
DataModificationTransaction it = this.statisticsManager.startChange();
NodeKey key = new NodeKey(notification.getId());
NodeRef ref = getNodeRef(key);
-
- final NodeBuilder nodeData = new NodeBuilder();
+
+ final NodeBuilder nodeData = new NodeBuilder();
nodeData.setKey(key);
-
+
NodeMeterFeaturesBuilder nodeMeterFeatures= new NodeMeterFeaturesBuilder();
nodeMeterFeatures.setMeterFeatures(meterFeature.build());
-
+
//Update augmented data
nodeData.addAugmentation(NodeMeterFeatures.class, nodeMeterFeatures.build());
-
+
InstanceIdentifier<? extends Object> refValue = ref.getValue();
it.putOperationalData(refValue, nodeData.build());
it.commit();
}
-
+
@Override
public void onGroupFeaturesUpdated(GroupFeaturesUpdated notification) {
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
- }
-
GroupFeaturesBuilder groupFeatures = new GroupFeaturesBuilder();
groupFeatures.setActions(notification.getActions());
groupFeatures.setGroupCapabilitiesSupported(notification.getGroupCapabilitiesSupported());
groupFeatures.setGroupTypesSupported(notification.getGroupTypesSupported());
groupFeatures.setMaxGroups(notification.getMaxGroups());
- cache.get(notification.getId()).setGroupFeatures(groupFeatures.build());
-
+
//Publish data to configuration data store
DataModificationTransaction it = this.statisticsManager.startChange();
NodeKey key = new NodeKey(notification.getId());
NodeRef ref = getNodeRef(key);
-
- final NodeBuilder nodeData = new NodeBuilder();
+
+ final NodeBuilder nodeData = new NodeBuilder();
nodeData.setKey(key);
-
+
NodeGroupFeaturesBuilder nodeGroupFeatures= new NodeGroupFeaturesBuilder();
nodeGroupFeatures.setGroupFeatures(groupFeatures.build());
-
+
//Update augmented data
nodeData.addAugmentation(NodeGroupFeatures.class, nodeGroupFeatures.build());
-
+
InstanceIdentifier<? extends Object> refValue = ref.getValue();
it.putOperationalData(refValue, nodeData.build());
it.commit();
@Override
public void onFlowsStatisticsUpdate(FlowsStatisticsUpdate notification) {
-
+
//Check if response is for the request statistics-manager sent.
if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
return;
NodeKey key = new NodeKey(notification.getId());
sucLogger.debug("Received flow stats update : {}",notification.toString());
-
+
for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){
short tableId = map.getTableId();
-
+
DataModificationTransaction it = this.statisticsManager.startChange();
boolean foundOriginalFlow = false;
flow.setPriority(map.getPriority());
flow.setStrict(map.isStrict());
flow.setTableId(tableId);
-
+
Flow flowRule = flow.build();
-
+
FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
stats.setByteCount(map.getByteCount());
stats.setPacketCount(map.getPacketCount());
stats.setDuration(map.getDuration());
-
+
GenericStatistics flowStats = stats.build();
-
+
//Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+ ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
+ cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
}
- if(!cache.get(notification.getId()).getFlowAndStatsMap().containsKey(tableId)){
- cache.get(notification.getId()).getFlowAndStatsMap().put(tableId, new HashMap<Flow,GenericStatistics>());
- }
- cache.get(notification.getId()).getFlowAndStatsMap().get(tableId).put(flowRule,flowStats);
-
+ NodeStatisticsAger nsa = cache.get(notification.getId());
+ FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flowRule);
+ cache.get(notification.getId()).updateFlowStats(flowStatsEntry);
+
//Augment the data to the flow node
FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
flowStatistics.setTableId(tableId);
flowStatisticsData.setFlowStatistics(flowStatistics.build());
-
+
sucLogger.debug("Flow : {}",flowRule.toString());
sucLogger.debug("Statistics to augment : {}",flowStatistics.build().toString());
InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
-
+
Table table= (Table)it.readConfigurationData(tableRef);
//TODO: Not a good way to do it, need to figure out better way.
- //TODO: major issue in any alternate approach is that flow key is incrementally assigned
+ //TODO: major issue in any alternate approach is that flow key is incrementally assigned
//to the flows stored in data store.
if(table != null){
.child(Flow.class,existingFlow.getKey()).toInstance();
flowBuilder.setKey(existingFlow.getKey());
flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- sucLogger.trace("Found matching flow in the datastore, augmenting statistics");
+ sucLogger.info("Found matching flow in the datastore, augmenting statistics");
foundOriginalFlow = true;
it.putOperationalData(flowRef, flowBuilder.build());
it.commit();
}
}
}
+
+ table= (Table)it.readOperationalData(tableRef);
+ if(!foundOriginalFlow && table != null){
+ for(Flow existingFlow : table.getFlow()){
+ FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
+ if(augmentedflowStatisticsData != null){
+ FlowBuilder existingOperationalFlow = new FlowBuilder();
+ existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
+ sucLogger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
+ if(flowEquals(flowRule,existingOperationalFlow.build())){
+ InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(tableId))
+ .child(Flow.class,existingFlow.getKey()).toInstance();
+ flowBuilder.setKey(existingFlow.getKey());
+ flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+ sucLogger.debug("Found matching flow in the operational datastore, augmenting statistics");
+ foundOriginalFlow = true;
+ it.putOperationalData(flowRef, flowBuilder.build());
+ it.commit();
+ break;
+ }
+ }
+ }
+ }
if(!foundOriginalFlow){
sucLogger.debug("Associated original flow is not found in data store. Augmenting flow in operational data store");
- //TODO: Temporary fix: format [ 1+tableid+1+unaccounted flow counter]
- long flowKey = Long.parseLong(new String("1"+Short.toString(tableId)+"1"+Integer.toString(this.unaccountedFlowsCounter)));
+ long flowKey = Long.parseLong(new String("1"+Short.toString(tableId)+"0"+Integer.toString(this.unaccountedFlowsCounter)));
+ this.unaccountedFlowsCounter++;
FlowKey newFlowKey = new FlowKey(new FlowId(Long.toString(flowKey)));
InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
.augmentation(FlowCapableNode.class)
.child(Flow.class,newFlowKey).toInstance();
flowBuilder.setKey(newFlowKey);
flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- sucLogger.trace("Flow was no present in data store, augmenting statistics as an unaccounted flow");
+ sucLogger.info("Flow was no present in data store, augmenting statistics as an unaccounted flow");
it.putOperationalData(flowRef, flowBuilder.build());
it.commit();
}
NodeKey key = new NodeKey(notification.getId());
sucLogger.debug("Received aggregate flow statistics update : {}",notification.toString());
-
+
Short tableId = this.statisticsManager.getMultipartMessageManager().getTableIdForTxId(notification.getTransactionId());
if(tableId != null){
-
+
DataModificationTransaction it = this.statisticsManager.startChange();
InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
aggregateFlowStatisticsBuilder.setFlowCount(notification.getFlowCount());
aggregateFlowStatisticsBuilder.setPacketCount(notification.getPacketCount());
aggregateFlowStatisticsDataBuilder.setAggregateFlowStatistics(aggregateFlowStatisticsBuilder.build());
-
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
- }
- cache.get(notification.getId()).getTableAndAggregateFlowStatsMap().put(tableId,aggregateFlowStatisticsBuilder.build());
-
+
sucLogger.debug("Augment aggregate statistics: {} for table {} on Node {}",aggregateFlowStatisticsBuilder.build().toString(),tableId,key);
TableBuilder tableBuilder = new TableBuilder();
NodeKey key = new NodeKey(notification.getId());
sucLogger.debug("Received port stats update : {}",notification.toString());
-
- //Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
- }
-
-
+
List<NodeConnectorStatisticsAndPortNumberMap> portsStats = notification.getNodeConnectorStatisticsAndPortNumberMap();
for(NodeConnectorStatisticsAndPortNumberMap portStats : portsStats){
-
+
DataModificationTransaction it = this.statisticsManager.startChange();
- FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
+ FlowCapableNodeConnectorStatisticsBuilder statisticsBuilder
= new FlowCapableNodeConnectorStatisticsBuilder();
statisticsBuilder.setBytes(portStats.getBytes());
statisticsBuilder.setCollisionCount(portStats.getCollisionCount());
statisticsBuilder.setReceiveOverRunError(portStats.getReceiveOverRunError());
statisticsBuilder.setTransmitDrops(portStats.getTransmitDrops());
statisticsBuilder.setTransmitErrors(portStats.getTransmitErrors());
-
- //Update data in the cache
- cache.get(notification.getId()).getNodeConnectorStats().put(portStats.getNodeConnectorId(), statisticsBuilder.build());
-
+
//Augment data to the node-connector
- FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
+ FlowCapableNodeConnectorStatisticsDataBuilder statisticsDataBuilder =
new FlowCapableNodeConnectorStatisticsDataBuilder();
-
+
statisticsDataBuilder.setFlowCapableNodeConnectorStatistics(statisticsBuilder.build());
-
+
InstanceIdentifier<NodeConnector> nodeConnectorRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key).child(NodeConnector.class, new NodeConnectorKey(portStats.getNodeConnectorId())).toInstance();
-
+
NodeConnector nodeConnector = (NodeConnector)it.readOperationalData(nodeConnectorRef);
-
+
if(nodeConnector != null){
sucLogger.debug("Augmenting port statistics {} to port {}",statisticsDataBuilder.build().toString(),nodeConnectorRef.toString());
NodeConnectorBuilder nodeConnectorBuilder = new NodeConnectorBuilder();
NodeKey key = new NodeKey(notification.getId());
sucLogger.debug("Received flow table statistics update : {}",notification.toString());
-
+
List<FlowTableAndStatisticsMap> flowTablesStatsList = notification.getFlowTableAndStatisticsMap();
for (FlowTableAndStatisticsMap ftStats : flowTablesStatsList){
-
+
DataModificationTransaction it = this.statisticsManager.startChange();
InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(ftStats.getTableId().getValue())).toInstance();
-
+
FlowTableStatisticsDataBuilder statisticsDataBuilder = new FlowTableStatisticsDataBuilder();
-
+
FlowTableStatisticsBuilder statisticsBuilder = new FlowTableStatisticsBuilder();
statisticsBuilder.setActiveFlows(ftStats.getActiveFlows());
statisticsBuilder.setPacketsLookedUp(ftStats.getPacketsLookedUp());
statisticsBuilder.setPacketsMatched(ftStats.getPacketsMatched());
-
+
statisticsDataBuilder.setFlowTableStatistics(statisticsBuilder.build());
-
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
- if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
- }
- cache.get(notification.getId()).getFlowTableAndStatisticsMap().put(ftStats.getTableId().getValue(),statisticsBuilder.build());
-
+
sucLogger.debug("Augment flow table statistics: {} for table {} on Node {}",statisticsBuilder.build().toString(),ftStats.getTableId(),key);
-
+
TableBuilder tableBuilder = new TableBuilder();
tableBuilder.setKey(new TableKey(ftStats.getTableId().getValue()));
tableBuilder.addAugmentation(FlowTableStatisticsData.class, statisticsDataBuilder.build());
@Override
public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
-
+
//Check if response is for the request statistics-manager sent.
if(this.statisticsManager.getMultipartMessageManager().removeTxId(notification.getTransactionId()) == null)
return;
NodeKey key = new NodeKey(notification.getId());
sucLogger.debug("Received queue stats update : {}",notification.toString());
-
+
//Add statistics to local cache
- ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+ ConcurrentMap<NodeId, NodeStatisticsAger> cache = this.statisticsManager.getStatisticsCache();
if(!cache.containsKey(notification.getId())){
- cache.put(notification.getId(), new NodeStatistics());
+ cache.put(notification.getId(), new NodeStatisticsAger(statisticsManager,key));
}
-
+
+ NodeStatisticsAger nsa = cache.get(notification.getId());
+
List<QueueIdAndStatisticsMap> queuesStats = notification.getQueueIdAndStatisticsMap();
for(QueueIdAndStatisticsMap swQueueStats : queuesStats){
-
- if(!cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap().containsKey(swQueueStats.getNodeConnectorId())){
- cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap().put(swQueueStats.getNodeConnectorId(), new HashMap<QueueId,GenericQueueStatistics>());
- }
-
+
+ QueueEntry queueEntry = nsa.new QueueEntry(swQueueStats.getNodeConnectorId(),swQueueStats.getQueueId());
+ nsa.updateQueueStats(queueEntry);
+
FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
-
+
FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
-
+
queueStatisticsBuilder.fieldsFrom(swQueueStats);
-
+
queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
-
- cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap()
- .get(swQueueStats.getNodeConnectorId())
- .put(swQueueStats.getQueueId(), queueStatisticsBuilder.build());
-
-
+
DataModificationTransaction it = this.statisticsManager.startChange();
- InstanceIdentifier<Queue> queueRef
+ InstanceIdentifier<Queue> queueRef
= InstanceIdentifier.builder(Nodes.class)
.child(Node.class, key)
.child(NodeConnector.class, new NodeConnectorKey(swQueueStats.getNodeConnectorId()))
.augmentation(FlowCapableNodeConnector.class)
.child(Queue.class, new QueueKey(swQueueStats.getQueueId())).toInstance();
-
+
QueueBuilder queueBuilder = new QueueBuilder();
queueBuilder.addAugmentation(FlowCapableNodeConnectorQueueStatisticsData.class, queueStatisticsDataBuilder.build());
queueBuilder.setKey(new QueueKey(swQueueStats.getQueueId()));
- sucLogger.trace("Augmenting queue statistics {} of queue {} to port {}"
+ sucLogger.info("Augmenting queue statistics {} of queue {} to port {}"
,queueStatisticsDataBuilder.build().toString(),
swQueueStats.getQueueId(),
swQueueStats.getNodeConnectorId());
-
+
it.putOperationalData(queueRef, queueBuilder.build());
it.commit();
-
+
}
-
+
}
private NodeRef getNodeRef(NodeKey nodeKey){
InstanceIdentifierBuilder<?> builder = InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey);
return new NodeRef(builder.toInstance());
}
-
+
public boolean flowEquals(Flow statsFlow, Flow storedFlow) {
if (statsFlow.getClass() != storedFlow.getClass()) {
return false;
}
return true;
}
-
+
/**
* Explicit equals method to compare the 'match' for flows stored in the data-stores and flow fetched from the switch.
- * Usecase: e.g If user don't set any ethernet source and destination address for match,data store will store null for
+ * Usecase: e.g If user don't set any ethernet source and destination address for match,data store will store null for
* these address.
* e.g [_ethernetMatch=EthernetMatch [_ethernetDestination=null, _ethernetSource=null, _ethernetType=
* EthernetType [_type=EtherType [_value=2048], _mask=null, augmentation=[]]
- *
- * But when you fetch the flows from switch, openflow driver library converts all zero bytes of mac address in the
- * message stream to 00:00:00:00:00:00. Following string shows how library interpret the zero mac address bytes and
- * eventually when translator convert it to MD-SAL match, this is how it looks
- * [_ethernetDestination=EthernetDestination [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]],
- * _ethernetSource=EthernetSource [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]],
+ *
+ * But when you fetch the flows from switch, openflow driver library converts all zero bytes of mac address in the
+ * message stream to 00:00:00:00:00:00. Following string shows how library interpret the zero mac address bytes and
+ * eventually when translator convert it to MD-SAL match, this is how it looks
+ * [_ethernetDestination=EthernetDestination [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]],
+ * _ethernetSource=EthernetSource [_address=MacAddress [_value=00:00:00:00:00:00], _mask=null, augmentation=[]],
* _ethernetType=EthernetType [_type=EtherType [_value=2048], _mask=null, augmentation=[]]
- *
- * Similarly for inPort, if user/application don't set any value for it, FRM will store null value for it in data store.
+ *
+ * Similarly for inPort, if user/application don't set any value for it, FRM will store null value for it in data store.
* When we fetch the same flow (with its statistics) from switch, plugin converts its value to openflow:X:0.
- * e.g _inPort=Uri [_value=openflow:1:0]
- *
+ * e.g _inPort=Uri [_value=openflow:1:0]
+ *
* So this custom equals method add additional check to take care of these scenario, in case any match element is null in data-store-flow, but not
* in the flow fetched from switch.
- *
+ *
* @param statsFlow
* @param storedFlow
* @return
--- /dev/null
+/*
+ * Copyright IBM Corporation, 2013. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.statistics.manager;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupDescStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterConfigStats;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.FlowCapableNodeConnectorQueueStatisticsData;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Following are two main responsibilities of the class
+ * 1) Listen for the create changes in config data store for tree nodes (Flow,Group,Meter,Queue)
+ * and send statistics request to the switch to fetch the statistics
+ *
+ * 2)Listen for the remove changes in config data store for tree nodes (Flow,Group,Meter,Queue)
+ * and remove the relative statistics data from operational data store.
+ *
+ * @author avishnoi@in.ibm.com
+ *
+ */
+public class StatisticsUpdateHandler implements DataChangeListener {
+
+ public final static Logger suhLogger = LoggerFactory.getLogger(StatisticsUpdateHandler.class);
+
+ private final StatisticsProvider statisticsManager;
+
+ public StatisticsUpdateHandler(final StatisticsProvider manager){
+
+ this.statisticsManager = manager;
+ }
+
+ public StatisticsProvider getStatisticsManager(){
+ return statisticsManager;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+
+ Map<InstanceIdentifier<?>, DataObject> additions = change.getCreatedConfigurationData();
+ for (InstanceIdentifier<? extends DataObject> dataObjectInstance : additions.keySet()) {
+ DataObject dataObject = additions.get(dataObjectInstance);
+ InstanceIdentifier<Node> nodeII = dataObjectInstance.firstIdentifierOf(Node.class);
+ NodeRef nodeRef = new NodeRef(nodeII);
+ if(dataObject instanceof Flow){
+ Flow flow = (Flow) dataObject;
+ try {
+ this.statisticsManager.sendFlowStatsFromTableRequest(nodeRef, flow);
+ } catch (InterruptedException | ExecutionException e) {
+ suhLogger.warn("Following exception occured while sending flow statistics request newly added flow: {}", e);
+ }
+ }
+ if(dataObject instanceof Meter){
+ try {
+ this.statisticsManager.sendMeterConfigStatisticsRequest(nodeRef);
+ } catch (InterruptedException | ExecutionException e) {
+ suhLogger.warn("Following exception occured while sending meter statistics request for newly added meter: {}", e);
+ }
+ }
+ if(dataObject instanceof Group){
+ try {
+ this.statisticsManager.sendGroupDescriptionRequest(nodeRef);
+ } catch (InterruptedException | ExecutionException e) {
+ suhLogger.warn("Following exception occured while sending group description request for newly added group: {}", e);
+ }
+ }
+ if(dataObject instanceof Queue){
+ Queue queue = (Queue) dataObject;
+ InstanceIdentifier<NodeConnector> nodeConnectorII = dataObjectInstance.firstIdentifierOf(NodeConnector.class);
+ NodeConnectorKey nodeConnectorKey = InstanceIdentifier.keyOf(nodeConnectorII);
+ try {
+ this.statisticsManager.sendQueueStatsFromGivenNodeConnector(nodeRef, nodeConnectorKey.getId(), queue.getQueueId());
+ } catch (InterruptedException | ExecutionException e) {
+ suhLogger.warn("Following exception occured while sending queue statistics request for newly added group: {}", e);
+ }
+ }
+ }
+
+ Set<InstanceIdentifier<? extends DataObject>> removals = change.getRemovedConfigurationData();
+ for (InstanceIdentifier<? extends DataObject> dataObjectInstance : removals) {
+ DataObject dataObject = change.getOriginalConfigurationData().get(dataObjectInstance);
+
+ if(dataObject instanceof Flow){
+ InstanceIdentifier<Flow> flowII = (InstanceIdentifier<Flow>)dataObjectInstance;
+ InstanceIdentifier<?> flowAugmentation =
+ InstanceIdentifier.builder(flowII).augmentation(FlowStatisticsData.class).toInstance();
+ removeAugmentedOperationalData(flowAugmentation);
+ }
+ if(dataObject instanceof Meter){
+ InstanceIdentifier<Meter> meterII = (InstanceIdentifier<Meter>)dataObjectInstance;
+
+ InstanceIdentifier<?> nodeMeterConfigStatsAugmentation =
+ InstanceIdentifier.builder(meterII).augmentation(NodeMeterConfigStats.class).toInstance();
+ removeAugmentedOperationalData(nodeMeterConfigStatsAugmentation);
+
+ InstanceIdentifier<?> nodeMeterStatisticsAugmentation =
+ InstanceIdentifier.builder(meterII).augmentation(NodeMeterStatistics.class).toInstance();
+ removeAugmentedOperationalData(nodeMeterStatisticsAugmentation);
+ }
+
+ if(dataObject instanceof Group){
+ InstanceIdentifier<Group> groupII = (InstanceIdentifier<Group>)dataObjectInstance;
+
+ InstanceIdentifier<?> nodeGroupDescStatsAugmentation =
+ InstanceIdentifier.builder(groupII).augmentation(NodeGroupDescStats.class).toInstance();
+ removeAugmentedOperationalData(nodeGroupDescStatsAugmentation);
+
+ InstanceIdentifier<?> nodeGroupStatisticsAugmentation =
+ InstanceIdentifier.builder(groupII).augmentation(NodeGroupStatistics.class).toInstance();
+ removeAugmentedOperationalData(nodeGroupStatisticsAugmentation);
+ }
+
+ if(dataObject instanceof Queue){
+ InstanceIdentifier<Queue> queueII = (InstanceIdentifier<Queue>)dataObjectInstance;
+
+ InstanceIdentifier<?> nodeConnectorQueueStatisticsDataAugmentation =
+ InstanceIdentifier.builder(queueII).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class).toInstance();
+ removeAugmentedOperationalData(nodeConnectorQueueStatisticsDataAugmentation);
+ }
+ }
+ }
+
+ private void removeAugmentedOperationalData(InstanceIdentifier<? extends DataObject> dataObjectInstance ){
+ if(dataObjectInstance != null){
+ DataModificationTransaction it = this.statisticsManager.startChange();
+ it.removeOperationalData(dataObjectInstance);
+ it.commit();
+ }
+ }
+}