2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics;
10 import java.math.BigInteger;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.List;
17 import org.apache.commons.lang3.tuple.Pair;
18 import org.opendaylight.groupbasedpolicy.api.StatisticsManager;
19 import org.opendaylight.groupbasedpolicy.dto.ConsEpgKey;
20 import org.opendaylight.groupbasedpolicy.dto.EpgKey;
21 import org.opendaylight.groupbasedpolicy.dto.EpgKeyDto;
22 import org.opendaylight.groupbasedpolicy.dto.ProvEpgKey;
23 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.flowcache.FlowCache;
24 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.flowcache.FlowCacheData;
25 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.util.FlowCacheCons;
26 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.util.IidSflowNameUtil;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ContractId;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.HasDirection.Direction;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.StatRecords;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.StatRecordsBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.EpToEpStatistic;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.EpToEpStatisticBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.ep.to.ep.statistic.EpEpgToEpEpgStatistic;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.ep.to.ep.statistic.EpEpgToEpEpgStatisticBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.ep.to.ep.statistic.ep.epg.to.ep.epg.statistic.MatchedRuleStatisticBuilder;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import com.google.common.collect.ImmutableList;
44 public class ProcessDataTask implements Runnable {
46 private static final Logger LOG = LoggerFactory.getLogger(ProcessDataTask.class);
48 private FlowCache flowCache;
49 private BigInteger timestamp;
50 private StatisticsManager statisticsManager;
51 List<FlowCacheData> dataList;
53 public ProcessDataTask(FlowCache flowCache, List<FlowCacheData> dataList, BigInteger timestamp,
54 StatisticsManager statisticsManager) {
55 this.flowCache = flowCache;
56 this.dataList = dataList;
57 this.timestamp = timestamp;
58 this.statisticsManager = statisticsManager;
63 for (FlowCacheData flowCacheData : dataList) {
64 Map<String, String> flowCacheDataMap = createFlowCacheDataMap(flowCacheData);
65 if (flowCacheDataMap == null) {
66 LOG.info("Stats are skipped for {}", flowCacheData);
69 String srcIp = flowCacheDataMap.get(FlowCacheCons.Key.IP_SOURCE.get());
70 String dstIp = flowCacheDataMap.get(FlowCacheCons.Key.IP_DESTINATION.get());
71 EndpointL3 srcEpL3 = OFStatisticsManager.getEndpointL3ForIp(srcIp);
72 EndpointL3 dstEpL3 = OFStatisticsManager.getEndpointL3ForIp(dstIp);
73 if (srcEpL3 != null && dstEpL3 != null) {
74 ContractId contractId = IidSflowNameUtil.resolveContractIdFromFlowCacheName(flowCache.getName());
75 MatchedRuleStatisticBuilder matchedRuleStatisticBuilder = new MatchedRuleStatisticBuilder()
76 .setContract(contractId)
77 .setSubject(IidSflowNameUtil.resolveSubjectNameFromFlowCacheName(flowCache.getName()))
78 .setMatchedRule(IidSflowNameUtil.resolveRuleNameFromFlowCacheName(flowCache.getName()))
79 .setClassifier(ImmutableList
80 .of(IidSflowNameUtil.resolveClassifierNameFromFlowCacheName(flowCache.getName())));
81 if (FlowCacheCons.Value.BYTES.get().equals(IidSflowNameUtil.resolveFlowCacheValue(flowCache.getName()))) {
82 matchedRuleStatisticBuilder.setByteCount(Math.round(flowCacheData.getValue()));
83 } else if (FlowCacheCons.Value.FRAMES.get().equals(IidSflowNameUtil.resolveFlowCacheValue(flowCache.getName()))) {
84 matchedRuleStatisticBuilder.setPacketCount(Math.round(flowCacheData.getValue()));
87 Set<Pair<ConsEpgKey, ProvEpgKey>> epgsForContract = OFStatisticsManager.getEpgsForContract(contractId);
88 Set<EpgKey> epgsFromSrcEp = getEpgsFromEndpoint(srcEpL3);
89 Set<EpgKey> epgsFromDstEp = getEpgsFromEndpoint(dstEpL3);
90 Pair<? extends EpgKey, ? extends EpgKey> leftSrcEpgRightDstEpg = getMatchingEpgs(epgsForContract, epgsFromSrcEp, epgsFromDstEp, flowCache.getDirection());
91 if (leftSrcEpgRightDstEpg == null) {
92 LOG.info("Stats are skipped for {}", flowCacheData);
96 EpEpgToEpEpgStatistic epEpgToEpEpgStats = new EpEpgToEpEpgStatisticBuilder()
97 .setSrcEpg(leftSrcEpgRightDstEpg.getLeft().getEpgId())
98 .setDstEpg(leftSrcEpgRightDstEpg.getRight().getEpgId())
99 .setMatchedRuleStatistic(ImmutableList.of(matchedRuleStatisticBuilder.build()))
102 EpToEpStatistic e2e = new EpToEpStatisticBuilder().setSrcL2c(srcEpL3.getL2Context())
103 .setSrcMacAddress(srcEpL3.getMacAddress())
104 .setSrcTenant(srcEpL3.getTenant())
105 .setDstL2c(dstEpL3.getL2Context())
106 .setDstMacAddress(dstEpL3.getMacAddress())
107 .setDstTenant(dstEpL3.getTenant())
108 .setEpEpgToEpEpgStatistic(ImmutableList.of(epEpgToEpEpgStats))
109 .setTimestamp(timestamp)
112 StatRecords statRecords = new StatRecordsBuilder().setEpToEpStatistic(ImmutableList.of(e2e)).build();
114 if (LOG.isTraceEnabled()) {
115 LOG.trace("[sflow] writing StatRecords: {}", statRecords);
117 statisticsManager.writeStat(statRecords);
122 private Set<EpgKey> getEpgsFromEndpoint(EndpointL3 epL3) {
123 Set<EpgKey> result = new HashSet<>();
124 TenantId tenantId = epL3.getTenant();
125 if (epL3.getEndpointGroup() != null) {
126 result.add(new EpgKeyDto(epL3.getEndpointGroup(), tenantId));
128 List<EndpointGroupId> epgs = epL3.getEndpointGroups();
130 for (EndpointGroupId epg : epgs) {
131 result.add(new EpgKeyDto(epg, tenantId));
137 private Pair<? extends EpgKey, ? extends EpgKey> getMatchingEpgs(Set<Pair<ConsEpgKey, ProvEpgKey>> epgsForContract,
138 Set<EpgKey> epgsFromSrcEp, Set<EpgKey> epgsFromDstEp, Direction direction) {
139 if (direction == null || Direction.Bidirectional == direction) {
140 LOG.info("The bidirectional direction is not supported.");
143 for (Pair<ConsEpgKey, ProvEpgKey> epgForContract : epgsForContract) {
144 ConsEpgKey consEpg = epgForContract.getLeft();
145 ProvEpgKey provEpg = epgForContract.getRight();
146 if (epgsFromSrcEp.contains(consEpg) && epgsFromDstEp.contains(provEpg)) {
147 if (Direction.In.equals(direction)) {
148 return Pair.of(consEpg, provEpg);
149 } else if (Direction.Out.equals(direction)) {
150 return Pair.of(provEpg, consEpg);
153 if (epgsFromSrcEp.contains(provEpg) && epgsFromDstEp.contains(consEpg)) {
154 if (Direction.In.equals(direction)) {
155 return Pair.of(consEpg, provEpg);
156 } else if (Direction.Out.equals(direction)) {
157 return Pair.of(provEpg, consEpg);
162 "EPGs of srcEP and dstEp does not match against EPGs for contract:"
163 + "\nsrcEP EPGs: {}\ndstEP EPGs: {}\nEPGs for contract: {}",
164 epgsFromSrcEp, epgsFromDstEp, epgsForContract);
168 private Map<String, String> createFlowCacheDataMap(FlowCacheData flowCacheData) {
169 String[] splitValues = flowCacheData.getKey().split(",");
170 if (splitValues.length != flowCache.getKeyNum()) {
172 "Key names and key values lists length do not match: {} != {}. Not processing.",
173 flowCache.getKeyNum(), splitValues.length);
176 Map<String, String> flowCacheDataMap = new HashMap<>();
177 for (int i = 0; i < flowCache.getKeyNum(); i++) {
178 flowCacheDataMap.put(flowCache.getKeyNames()[i], splitValues[i]);
180 return flowCacheDataMap;