From cb016426ddc0f08cee3c9475f4214388c0035edd Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 12 Feb 2014 03:07:45 +0100 Subject: [PATCH] Move flow statistics update handling This finishes the cleanup of StatisticsUpdateCommiter, we now have all per-node statistics processing guarded by the lock -- e.g. have predictable and isolated updates to statistics. Change-Id: Ia58983d5d1dc857e5239a68e3e94fc7d3676cf6d Signed-off-by: Robert Varga --- .../manager/NodeStatisticsAger.java | 166 +++++++++++++++- .../manager/StatisticsUpdateCommiter.java | 181 +----------------- 2 files changed, 166 insertions(+), 181 deletions(-) diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java index 8e68515c32..990e6bb4ff 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsAger.java @@ -17,6 +17,7 @@ import java.util.concurrent.TimeUnit; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey; @@ -24,10 +25,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap; @@ -73,6 +80,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats; import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics; +import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder; @@ -105,6 +113,7 @@ public class NodeStatisticsAger { private final InstanceIdentifier targetNodeIdentifier; private final StatisticsProvider statisticsProvider; private final NodeKey targetNodeKey; + private int unaccountedFlowsCounter = 1; public NodeStatisticsAger(StatisticsProvider statisticsProvider, NodeKey nodeKey){ this.statisticsProvider = Preconditions.checkNotNull(statisticsProvider); @@ -514,8 +523,160 @@ public class NodeStatisticsAger { trans.commit(); } - public synchronized void updateFlowStats(FlowEntry flowEntry){ - this.flowStatsUpdate.put(flowEntry, getExpiryTime()); + public synchronized void updateFlowStats(List list) { + final Long expiryTime = getExpiryTime(); + final DataModificationTransaction trans = statisticsProvider.startChange(); + + for(FlowAndStatisticsMapList map : list) { + short tableId = map.getTableId(); + boolean foundOriginalFlow = false; + + FlowBuilder flowBuilder = new FlowBuilder(); + + FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder(); + + FlowBuilder flow = new FlowBuilder(); + flow.setContainerName(map.getContainerName()); + flow.setBufferId(map.getBufferId()); + flow.setCookie(map.getCookie()); + flow.setCookieMask(map.getCookieMask()); + flow.setFlags(map.getFlags()); + flow.setFlowName(map.getFlowName()); + flow.setHardTimeout(map.getHardTimeout()); + if(map.getFlowId() != null) + flow.setId(new FlowId(map.getFlowId().getValue())); + flow.setIdleTimeout(map.getIdleTimeout()); + flow.setInstallHw(map.isInstallHw()); + flow.setInstructions(map.getInstructions()); + if(map.getFlowId()!= null) + flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue()))); + flow.setMatch(map.getMatch()); + flow.setOutGroup(map.getOutGroup()); + flow.setOutPort(map.getOutPort()); + flow.setPriority(map.getPriority()); + flow.setStrict(map.isStrict()); + flow.setTableId(tableId); + + Flow flowRule = flow.build(); + + FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder(); + stats.setByteCount(map.getByteCount()); + stats.setPacketCount(map.getPacketCount()); + stats.setDuration(map.getDuration()); + + GenericStatistics flowStats = stats.build(); + + //Augment the data to the flow node + + FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder(); + flowStatistics.setByteCount(flowStats.getByteCount()); + flowStatistics.setPacketCount(flowStats.getPacketCount()); + flowStatistics.setDuration(flowStats.getDuration()); + flowStatistics.setContainerName(map.getContainerName()); + flowStatistics.setBufferId(map.getBufferId()); + flowStatistics.setCookie(map.getCookie()); + flowStatistics.setCookieMask(map.getCookieMask()); + flowStatistics.setFlags(map.getFlags()); + flowStatistics.setFlowName(map.getFlowName()); + flowStatistics.setHardTimeout(map.getHardTimeout()); + flowStatistics.setIdleTimeout(map.getIdleTimeout()); + flowStatistics.setInstallHw(map.isInstallHw()); + flowStatistics.setInstructions(map.getInstructions()); + flowStatistics.setMatch(map.getMatch()); + flowStatistics.setOutGroup(map.getOutGroup()); + flowStatistics.setOutPort(map.getOutPort()); + flowStatistics.setPriority(map.getPriority()); + flowStatistics.setStrict(map.isStrict()); + flowStatistics.setTableId(tableId); + + flowStatisticsData.setFlowStatistics(flowStatistics.build()); + + logger.debug("Flow : {}",flowRule.toString()); + logger.debug("Statistics to augment : {}",flowStatistics.build().toString()); + + InstanceIdentifier tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey) + .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance(); + + Table table= (Table)trans.readConfigurationData(tableRef); + + //TODO: Not a good way to do it, need to figure out better way. + //TODO: major issue in any alternate approach is that flow key is incrementally assigned + //to the flows stored in data store. + // Augment same statistics to all the matching masked flow + if(table != null){ + + for(Flow existingFlow : table.getFlow()){ + logger.debug("Existing flow in data store : {}",existingFlow.toString()); + if(FlowComparator.flowEquals(flowRule,existingFlow)){ + InstanceIdentifier flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey) + .augmentation(FlowCapableNode.class) + .child(Table.class, new TableKey(tableId)) + .child(Flow.class,existingFlow.getKey()).toInstance(); + flowBuilder.setKey(existingFlow.getKey()); + flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); + logger.debug("Found matching flow in the datastore, augmenting statistics"); + foundOriginalFlow = true; + // Update entry with timestamp of latest response + flow.setKey(existingFlow.getKey()); + FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build()); + flowStatsUpdate.put(flowStatsEntry, expiryTime); + + trans.putOperationalData(flowRef, flowBuilder.build()); + } + } + } + + table = (Table)trans.readOperationalData(tableRef); + if(!foundOriginalFlow && table != null){ + + for(Flow existingFlow : table.getFlow()){ + FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class); + if(augmentedflowStatisticsData != null){ + FlowBuilder existingOperationalFlow = new FlowBuilder(); + existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics()); + logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString()); + if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){ + InstanceIdentifier flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey) + .augmentation(FlowCapableNode.class) + .child(Table.class, new TableKey(tableId)) + .child(Flow.class,existingFlow.getKey()).toInstance(); + flowBuilder.setKey(existingFlow.getKey()); + flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); + logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics"); + foundOriginalFlow = true; + + // Update entry with timestamp of latest response + flow.setKey(existingFlow.getKey()); + FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build()); + flowStatsUpdate.put(flowStatsEntry, expiryTime); + trans.putOperationalData(flowRef, flowBuilder.build()); + break; + } + } + } + } + if(!foundOriginalFlow){ + String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter); + this.unaccountedFlowsCounter++; + FlowKey newFlowKey = new FlowKey(new FlowId(flowKey)); + InstanceIdentifier flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey) + .augmentation(FlowCapableNode.class) + .child(Table.class, new TableKey(tableId)) + .child(Flow.class,newFlowKey).toInstance(); + flowBuilder.setKey(newFlowKey); + flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); + logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow", + flowBuilder.build()); + + // Update entry with timestamp of latest response + flow.setKey(newFlowKey); + FlowEntry flowStatsEntry = new FlowEntry(tableId,flow.build()); + flowStatsUpdate.put(flowStatsEntry, expiryTime); + trans.putOperationalData(flowRef, flowBuilder.build()); + } + } + + trans.commit(); } private static Long getExpiryTime(){ @@ -612,5 +773,4 @@ public class NodeStatisticsAger { InstanceIdentifier nodeGroupStatisticsAugmentation = groupRef.augmentation(NodeGroupStatistics.class).toInstance(); trans.removeOperationalData(nodeGroupStatisticsAugmentation); } - } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java index c60991e0de..317d2963f6 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java @@ -7,42 +7,23 @@ */ package org.opendaylight.controller.md.statistics.manager; -import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.FlowEntry; -import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,12 +43,9 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList OpendaylightQueueStatisticsListener{ private final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class); - private final StatisticsProvider statisticsManager; private final MultipartMessageManager messageManager; - private int unaccountedFlowsCounter = 1; - /** * default ctor * @param manager @@ -150,163 +128,10 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList return; sucLogger.debug("Received flow stats update : {}",notification.toString()); - - final NodeKey key = new NodeKey(notification.getId()); - final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(key.getId()); - DataModificationTransaction it = this.statisticsManager.startChange(); - - for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){ - short tableId = map.getTableId(); - - - boolean foundOriginalFlow = false; - - FlowBuilder flowBuilder = new FlowBuilder(); - - FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder(); - - FlowBuilder flow = new FlowBuilder(); - flow.setContainerName(map.getContainerName()); - flow.setBufferId(map.getBufferId()); - flow.setCookie(map.getCookie()); - flow.setCookieMask(map.getCookieMask()); - flow.setFlags(map.getFlags()); - flow.setFlowName(map.getFlowName()); - flow.setHardTimeout(map.getHardTimeout()); - if(map.getFlowId() != null) - flow.setId(new FlowId(map.getFlowId().getValue())); - flow.setIdleTimeout(map.getIdleTimeout()); - flow.setInstallHw(map.isInstallHw()); - flow.setInstructions(map.getInstructions()); - if(map.getFlowId()!= null) - flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue()))); - flow.setMatch(map.getMatch()); - flow.setOutGroup(map.getOutGroup()); - flow.setOutPort(map.getOutPort()); - flow.setPriority(map.getPriority()); - flow.setStrict(map.isStrict()); - flow.setTableId(tableId); - - Flow flowRule = flow.build(); - - FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder(); - stats.setByteCount(map.getByteCount()); - stats.setPacketCount(map.getPacketCount()); - stats.setDuration(map.getDuration()); - - GenericStatistics flowStats = stats.build(); - - //Augment the data to the flow node - - FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder(); - flowStatistics.setByteCount(flowStats.getByteCount()); - flowStatistics.setPacketCount(flowStats.getPacketCount()); - flowStatistics.setDuration(flowStats.getDuration()); - flowStatistics.setContainerName(map.getContainerName()); - flowStatistics.setBufferId(map.getBufferId()); - flowStatistics.setCookie(map.getCookie()); - flowStatistics.setCookieMask(map.getCookieMask()); - flowStatistics.setFlags(map.getFlags()); - flowStatistics.setFlowName(map.getFlowName()); - flowStatistics.setHardTimeout(map.getHardTimeout()); - flowStatistics.setIdleTimeout(map.getIdleTimeout()); - flowStatistics.setInstallHw(map.isInstallHw()); - flowStatistics.setInstructions(map.getInstructions()); - flowStatistics.setMatch(map.getMatch()); - flowStatistics.setOutGroup(map.getOutGroup()); - flowStatistics.setOutPort(map.getOutPort()); - flowStatistics.setPriority(map.getPriority()); - flowStatistics.setStrict(map.isStrict()); - flowStatistics.setTableId(tableId); - - flowStatisticsData.setFlowStatistics(flowStatistics.build()); - - sucLogger.debug("Flow : {}",flowRule.toString()); - sucLogger.debug("Statistics to augment : {}",flowStatistics.build().toString()); - - InstanceIdentifier
tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key) - .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance(); - - Table table= (Table)it.readConfigurationData(tableRef); - - //TODO: Not a good way to do it, need to figure out better way. - //TODO: major issue in any alternate approach is that flow key is incrementally assigned - //to the flows stored in data store. - // Augment same statistics to all the matching masked flow - if(table != null){ - - for(Flow existingFlow : table.getFlow()){ - sucLogger.debug("Existing flow in data store : {}",existingFlow.toString()); - if(FlowComparator.flowEquals(flowRule,existingFlow)){ - InstanceIdentifier flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key) - .augmentation(FlowCapableNode.class) - .child(Table.class, new TableKey(tableId)) - .child(Flow.class,existingFlow.getKey()).toInstance(); - flowBuilder.setKey(existingFlow.getKey()); - flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); - sucLogger.debug("Found matching flow in the datastore, augmenting statistics"); - foundOriginalFlow = true; - // Update entry with timestamp of latest response - flow.setKey(existingFlow.getKey()); - FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build()); - nsa.updateFlowStats(flowStatsEntry); - - it.putOperationalData(flowRef, flowBuilder.build()); - } - } - } - - table= (Table)it.readOperationalData(tableRef); - if(!foundOriginalFlow && table != null){ - - for(Flow existingFlow : table.getFlow()){ - FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class); - if(augmentedflowStatisticsData != null){ - FlowBuilder existingOperationalFlow = new FlowBuilder(); - existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics()); - sucLogger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString()); - if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){ - InstanceIdentifier flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key) - .augmentation(FlowCapableNode.class) - .child(Table.class, new TableKey(tableId)) - .child(Flow.class,existingFlow.getKey()).toInstance(); - flowBuilder.setKey(existingFlow.getKey()); - flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); - sucLogger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics"); - foundOriginalFlow = true; - - // Update entry with timestamp of latest response - flow.setKey(existingFlow.getKey()); - FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build()); - nsa.updateFlowStats(flowStatsEntry); - - it.putOperationalData(flowRef, flowBuilder.build()); - break; - } - } - } - } - if(!foundOriginalFlow){ - String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter); - this.unaccountedFlowsCounter++; - FlowKey newFlowKey = new FlowKey(new FlowId(flowKey)); - InstanceIdentifier flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key) - .augmentation(FlowCapableNode.class) - .child(Table.class, new TableKey(tableId)) - .child(Flow.class,newFlowKey).toInstance(); - flowBuilder.setKey(newFlowKey); - flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build()); - sucLogger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",flowBuilder.build()); - - // Update entry with timestamp of latest response - flow.setKey(newFlowKey); - FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build()); - nsa.updateFlowStats(flowStatsEntry); - - it.putOperationalData(flowRef, flowBuilder.build()); - } + final NodeStatisticsAger sna = this.statisticsManager.getStatisticsHandler(notification.getId()); + if (sna != null) { + sna.updateFlowStats(notification.getFlowAndStatisticsMapList()); } - it.commit(); } @Override -- 2.36.6