*/
package org.opendaylight.controller.md.statistics.manager;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map.Entry;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCookieMapping;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowCookieMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowCookieMapBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowCookieMapKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
+import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class FlowStatsTracker extends AbstractStatsTracker<FlowAndStatisticsMapList, FlowStatsEntry> {
- private static final Logger logger = LoggerFactory.getLogger(FlowStatsTracker.class);
+import com.google.common.base.Optional;
+
+final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatisticsMapList, FlowStatsEntry> {
+ private static final Logger LOG = LoggerFactory.getLogger(FlowStatsTracker.class);
+ private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
+ private final OpendaylightFlowStatisticsService flowStatsService;
+ private FlowTableStatsTracker flowTableStats;
private int unaccountedFlowsCounter = 1;
- FlowStatsTracker(InstanceIdentifier<Node> nodeIdentifier, DataProviderService dps, long lifetimeNanos) {
- super(nodeIdentifier, dps, lifetimeNanos);
+
+ FlowStatsTracker(final OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context) {
+ super(context);
+ this.flowStatsService = flowStatsService;
+ }
+ FlowStatsTracker(final OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, final FlowTableStatsTracker flowTableStats) {
+ this(flowStatsService, context);
+ this.flowTableStats = flowTableStats;
}
@Override
- protected void cleanupSingleStat(DataModificationTransaction trans, FlowStatsEntry item) {
- InstanceIdentifier<?> flowRef = getNodeIdentifierBuilder()
- .augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(item.getTableId()))
- .child(Flow.class,item.getFlow().getKey())
- .augmentation(FlowStatisticsData.class).toInstance();
+ protected void cleanupSingleStat(final DataModificationTransaction trans, final FlowStatsEntry item) {
+ KeyedInstanceIdentifier<Flow, FlowKey> flowRef = getNodeIdentifier()
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(item.getTableId()))
+ .child(Flow.class, item.getFlow().getKey());
trans.removeOperationalData(flowRef);
}
@Override
- protected FlowStatsEntry updateSingleStat(DataModificationTransaction trans, FlowAndStatisticsMapList map) {
+ protected FlowStatsEntry updateSingleStat(final DataModificationTransaction trans, final FlowAndStatisticsMapList map) {
short tableId = map.getTableId();
- FlowBuilder flowBuilder = new FlowBuilder();
-
FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
- FlowBuilder flow = new FlowBuilder();
- flow.setContainerName(map.getContainerName());
- flow.setBufferId(map.getBufferId());
- flow.setCookie(map.getCookie());
- flow.setCookieMask(map.getCookieMask());
- flow.setFlags(map.getFlags());
- flow.setFlowName(map.getFlowName());
- flow.setHardTimeout(map.getHardTimeout());
- if(map.getFlowId() != null)
- flow.setId(new FlowId(map.getFlowId().getValue()));
- flow.setIdleTimeout(map.getIdleTimeout());
- flow.setInstallHw(map.isInstallHw());
- flow.setInstructions(map.getInstructions());
- if(map.getFlowId()!= null)
- flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
- flow.setMatch(map.getMatch());
- flow.setOutGroup(map.getOutGroup());
- flow.setOutPort(map.getOutPort());
- flow.setPriority(map.getPriority());
- flow.setStrict(map.isStrict());
- flow.setTableId(tableId);
-
- Flow flowRule = flow.build();
+ FlowBuilder flowBuilder = new FlowBuilder(map);
+ if (map.getFlowId() != null) {
+ flowBuilder.setId(new FlowId(map.getFlowId().getValue()));
+ }
+ if (map.getFlowId() != null) {
+ flowBuilder.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
+ }
+
+ Flow flowRule = flowBuilder.build();
FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
stats.setByteCount(map.getByteCount());
flowStatistics.setByteCount(flowStats.getByteCount());
flowStatistics.setPacketCount(flowStats.getPacketCount());
flowStatistics.setDuration(flowStats.getDuration());
- flowStatistics.setContainerName(map.getContainerName());
- flowStatistics.setBufferId(map.getBufferId());
- flowStatistics.setCookie(map.getCookie());
- flowStatistics.setCookieMask(map.getCookieMask());
- flowStatistics.setFlags(map.getFlags());
- flowStatistics.setFlowName(map.getFlowName());
- flowStatistics.setHardTimeout(map.getHardTimeout());
- flowStatistics.setIdleTimeout(map.getIdleTimeout());
- flowStatistics.setInstallHw(map.isInstallHw());
- flowStatistics.setInstructions(map.getInstructions());
- flowStatistics.setMatch(map.getMatch());
- flowStatistics.setOutGroup(map.getOutGroup());
- flowStatistics.setOutPort(map.getOutPort());
- flowStatistics.setPriority(map.getPriority());
- flowStatistics.setStrict(map.isStrict());
- flowStatistics.setTableId(tableId);
flowStatisticsData.setFlowStatistics(flowStatistics.build());
- logger.debug("Flow : {}",flowRule.toString());
- logger.debug("Statistics to augment : {}",flowStatistics.build().toString());
+ LOG.debug("Flow : {}",flowRule.toString());
+ LOG.debug("Statistics to augment : {}",flowStatistics.build().toString());
InstanceIdentifier<Table> tableRef = getNodeIdentifierBuilder()
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(tableId)).toInstance();
- //TODO: Not a good way to do it, need to figure out better way.
- //TODO: major issue in any alternate approach is that flow key is incrementally assigned
- //to the flows stored in data store.
- // Augment same statistics to all the matching masked flow
- Table table= (Table)trans.readConfigurationData(tableRef);
- if(table != null){
- for(Flow existingFlow : table.getFlow()){
- logger.debug("Existing flow in data store : {}",existingFlow.toString());
- if(FlowComparator.flowEquals(flowRule,existingFlow)){
- InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder()
- .augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(tableId))
- .child(Flow.class,existingFlow.getKey()).toInstance();
- flowBuilder.setKey(existingFlow.getKey());
- flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- logger.debug("Found matching flow in the datastore, augmenting statistics");
- // Update entry with timestamp of latest response
- flow.setKey(existingFlow.getKey());
- FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
- trans.putOperationalData(flowRef, flowBuilder.build());
- return flowStatsEntry;
- }
+ final FlowCookie flowCookie = flowRule.getCookie() != null
+ ? flowRule.getCookie() : new FlowCookie(BigInteger.ZERO);
+ final InstanceIdentifier<FlowCookieMap> flowCookieRef = tableRef
+ .augmentation(FlowCookieMapping.class)
+ .child(FlowCookieMap.class, new FlowCookieMapKey(flowCookie));
+
+ FlowCookieMap cookieMap = (FlowCookieMap) trans.readOperationalData(flowCookieRef);
+
+ /* find flowKey in FlowCookieMap from DataStore/OPERATIONAL */
+ Optional<FlowKey> flowKey = this.getExistFlowKey(flowRule, tableRef, trans, cookieMap);
+ if ( ! flowKey.isPresent()) {
+ /* DataStore/CONFIG For every first statistic needs to be created */
+ flowKey = this.getFlowKeyFromExistFlow(flowRule, tableRef, trans);
+ if ( ! flowKey.isPresent()) {
+ /* Alien flow */
+ flowKey = this.makeAlienFlowKey(flowRule);
+ }
+ cookieMap = applyNewFlowKey(cookieMap, flowKey, flowCookie);
+ trans.putOperationalData(flowCookieRef, cookieMap);
+ }
+
+ InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder()
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(tableId))
+ .child(Flow.class, flowKey.get()).toInstance();
+ flowBuilder.setKey(flowKey.get());
+ flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
+
+ // Update entry with timestamp of latest response
+ flowBuilder.setKey(flowKey.get());
+ FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId, flowBuilder.build());
+ trans.putOperationalData(flowRef, flowBuilder.build());
+ return flowStatsEntry;
+ }
+
+ @Override
+ protected InstanceIdentifier<?> listenPath() {
+ return getNodeIdentifierBuilder().augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class).build();
+ }
+
+ @Override
+ protected String statName() {
+ return "Flow";
+ }
+
+ @Override
+ public void request() {
+ // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
+ // comes back -- we do not have any tables anyway.
+ final Collection<TableKey> tables = flowTableStats.getTables();
+ LOG.debug("Node {} supports {} table(s)", this.getNodeRef(), tables.size());
+ for (final TableKey key : tables) {
+ LOG.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), this.getNodeRef());
+ this.requestAggregateFlows(key);
+ }
+
+ this.requestAllFlowsAllTables();
+
+ }
+ public void requestAllFlowsAllTables() {
+ if (flowStatsService != null) {
+ final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
+ input.setNode(getNodeRef());
+
+ requestHelper(flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build()));
+ }
+ }
+
+ public void requestAggregateFlows(final TableKey key) {
+ if (flowStatsService != null) {
+ GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
+ new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+
+ input.setNode(getNodeRef());
+ input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(key.getId()));
+ requestHelper(flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build()));
+ }
+ }
+
+ public void requestFlow(final Flow flow) {
+ if (flowStatsService != null) {
+ final GetFlowStatisticsFromFlowTableInputBuilder input =
+ new GetFlowStatisticsFromFlowTableInputBuilder(flow);
+ input.setNode(getNodeRef());
+
+ requestHelper(flowStatsService.getFlowStatisticsFromFlowTable(input.build()));
+ }
+ }
+
+ @Override
+ public void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+ for (Entry<InstanceIdentifier<?>, DataObject> e : change.getCreatedConfigurationData().entrySet()) {
+ if (Flow.class.equals(e.getKey().getTargetType())) {
+ final Flow flow = (Flow) e.getValue();
+ LOG.debug("Key {} triggered request for flow {}", e.getKey(), flow);
+ requestFlow(flow);
+ } else {
+ LOG.debug("Ignoring key {}", e.getKey());
}
}
- table = (Table)trans.readOperationalData(tableRef);
- if(table != null){
- for(Flow existingFlow : table.getFlow()){
- FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
- if(augmentedflowStatisticsData != null){
- FlowBuilder existingOperationalFlow = new FlowBuilder();
- existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
- logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
- if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
- InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder()
- .augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(tableId))
- .child(Flow.class,existingFlow.getKey()).toInstance();
- flowBuilder.setKey(existingFlow.getKey());
- flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
- // Update entry with timestamp of latest response
- flow.setKey(existingFlow.getKey());
- FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
- trans.putOperationalData(flowRef, flowBuilder.build());
- return flowStatsEntry;
+ final DataModificationTransaction trans = startTransaction();
+ for (InstanceIdentifier<?> key : change.getRemovedConfigurationData()) {
+ if (Flow.class.equals(key.getTargetType())) {
+ @SuppressWarnings("unchecked")
+ final InstanceIdentifier<Flow> flow = (InstanceIdentifier<Flow>)key;
+ LOG.debug("Key {} triggered remove of Flow from operational space.", key);
+ trans.removeOperationalData(flow);
+ }
+ }
+ trans.commit();
+ }
+
+ @Override
+ public void start(final DataBrokerService dbs) {
+ if (flowStatsService == null) {
+ LOG.debug("No Flow Statistics service, not subscribing to flows on node {}", getNodeIdentifier());
+ return;
+ }
+
+ super.start(dbs);
+ }
+
+ /* Returns Exist FlowKey from exist FlowCookieMap identified by cookie
+ * and by switch flow identification (priority and match)*/
+ private Optional<FlowKey> getExistFlowKey(final Flow flowRule, final InstanceIdentifier<Table> tableRef,
+ final DataModificationTransaction trans, final FlowCookieMap cookieMap) {
+
+ if (cookieMap != null) {
+ for (FlowId flowId : cookieMap.getFlowIds()) {
+ InstanceIdentifier<Flow> flowIdent = tableRef.child(Flow.class, new FlowKey(flowId));
+ if (flowId.getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
+ LOG.debug("Search for flow in the operational datastore by flowID: {} ", flowIdent);
+ Flow readedFlow = (Flow) trans.readOperationalData(flowIdent);
+ if (FlowComparator.flowEquals(flowRule, readedFlow)) {
+ return Optional.<FlowKey> of(new FlowKey(flowId));
+ }
+ } else {
+ LOG.debug("Search for flow in the configuration datastore by flowID: {} ", flowIdent);
+ Flow readedFlow = (Flow) trans.readConfigurationData(flowIdent);
+ if (FlowComparator.flowEquals(flowRule, readedFlow)) {
+ return Optional.<FlowKey> of(new FlowKey(flowId));
}
}
}
+ LOG.debug("Flow was not found in the datastore. Flow {} ", flowRule);
+ }
+ return Optional.absent();
+ }
+
+ /* Returns FlowKey from existing Flow in DataStore/CONFIGURATIONAL which is identified by cookie
+ * and by switch flow identification (priority and match) */
+ private Optional<FlowKey> getFlowKeyFromExistFlow(final Flow flowRule, final InstanceIdentifier<Table> tableRef,
+ final DataModificationTransaction trans) {
+
+ /* Try to find it in DataSotre/CONFIG */
+ Table table= (Table)trans.readConfigurationData(tableRef);
+ if(table != null) {
+ for(Flow existingFlow : table.getFlow()) {
+ LOG.debug("Existing flow in data store : {}",existingFlow.toString());
+ if(FlowComparator.flowEquals(flowRule,existingFlow)){
+ return Optional.<FlowKey> of(new FlowKey(existingFlow.getId()));
+ }
+ }
}
+ return Optional.absent();
+ }
+
+ /* Returns FlowKey which doesn't exist in any DataStore for now */
+ private Optional<FlowKey> makeAlienFlowKey(final Flow flowRule) {
- String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
+ StringBuilder sBuilder = new StringBuilder(ALIEN_SYSTEM_FLOW_ID)
+ .append(flowRule.getTableId()).append("-").append(this.unaccountedFlowsCounter);
this.unaccountedFlowsCounter++;
- FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
- InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder().augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(tableId))
- .child(Flow.class,newFlowKey).toInstance();
- flowBuilder.setKey(newFlowKey);
- flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
- logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",
- flowBuilder.build());
+ final FlowId flowId = new FlowId(sBuilder.toString());
+ return Optional.<FlowKey> of(new FlowKey(flowId));
+ }
- // Update entry with timestamp of latest response
- flow.setKey(newFlowKey);
- FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
- trans.putOperationalData(flowRef, flowBuilder.build());
- return flowStatsEntry;
+ /* Build new whole FlowCookieMap or add new flowKey */
+ private FlowCookieMap applyNewFlowKey(FlowCookieMap flowCookieMap, final Optional<FlowKey> flowKey,
+ final FlowCookie flowCookie) {
+ if (flowCookieMap != null) {
+ flowCookieMap.getFlowIds().add(flowKey.get().getId());
+ } else {
+ final FlowCookieMapBuilder flowCookieMapBuilder = new FlowCookieMapBuilder();
+ flowCookieMapBuilder.setCookie(flowCookie);
+ flowCookieMapBuilder.setFlowIds(Collections.singletonList(flowKey.get().getId()));
+ flowCookieMap = flowCookieMapBuilder.build();
+ }
+ return flowCookieMap;
}
}