Bug 4988: OF statistics & REST client
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / statistics / ProcessDataTask.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics;
9
10 import java.math.BigInteger;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16
17 import org.apache.commons.lang3.tuple.Pair;
18 import org.opendaylight.groupbasedpolicy.api.StatisticsManager;
19 import org.opendaylight.groupbasedpolicy.dto.ConsEpgKey;
20 import org.opendaylight.groupbasedpolicy.dto.EpgKey;
21 import org.opendaylight.groupbasedpolicy.dto.EpgKeyDto;
22 import org.opendaylight.groupbasedpolicy.dto.ProvEpgKey;
23 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.flowcache.FlowCache;
24 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.flowcache.FlowCacheData;
25 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.util.FlowCacheCons;
26 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.util.IidSflowNameUtil;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ContractId;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.HasDirection.Direction;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.StatRecords;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.StatRecordsBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.EpToEpStatistic;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.EpToEpStatisticBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.ep.to.ep.statistic.EpEpgToEpEpgStatistic;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.ep.to.ep.statistic.EpEpgToEpEpgStatisticBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.statistics.rev151215.statistic.records.stat.records.ep.to.ep.statistic.ep.epg.to.ep.epg.statistic.MatchedRuleStatisticBuilder;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.common.collect.ImmutableList;
43
44 public class ProcessDataTask implements Runnable {
45
46     private static final Logger LOG = LoggerFactory.getLogger(ProcessDataTask.class);
47
48     private FlowCache flowCache;
49     private BigInteger timestamp;
50     private StatisticsManager statisticsManager;
51     List<FlowCacheData> dataList;
52
53     public ProcessDataTask(FlowCache flowCache, List<FlowCacheData> dataList, BigInteger timestamp,
54             StatisticsManager statisticsManager) {
55         this.flowCache = flowCache;
56         this.dataList = dataList;
57         this.timestamp = timestamp;
58         this.statisticsManager = statisticsManager;
59     }
60
61     @Override
62     public void run() {
63         for (FlowCacheData flowCacheData : dataList) {
64             Map<String, String> flowCacheDataMap = createFlowCacheDataMap(flowCacheData);
65             if (flowCacheDataMap == null) {
66                 LOG.info("Stats are skipped for {}", flowCacheData);
67                 continue;
68             }
69             String srcIp = flowCacheDataMap.get(FlowCacheCons.Key.IP_SOURCE.get());
70             String dstIp = flowCacheDataMap.get(FlowCacheCons.Key.IP_DESTINATION.get());
71             EndpointL3 srcEpL3 = OFStatisticsManager.getEndpointL3ForIp(srcIp);
72             EndpointL3 dstEpL3 = OFStatisticsManager.getEndpointL3ForIp(dstIp);
73             if (srcEpL3 != null && dstEpL3 != null) {
74                 ContractId contractId = IidSflowNameUtil.resolveContractIdFromFlowCacheName(flowCache.getName());
75                 MatchedRuleStatisticBuilder matchedRuleStatisticBuilder = new MatchedRuleStatisticBuilder()
76                 .setContract(contractId)
77                 .setSubject(IidSflowNameUtil.resolveSubjectNameFromFlowCacheName(flowCache.getName()))
78                 .setMatchedRule(IidSflowNameUtil.resolveRuleNameFromFlowCacheName(flowCache.getName()))
79                 .setClassifier(ImmutableList
80                     .of(IidSflowNameUtil.resolveClassifierNameFromFlowCacheName(flowCache.getName())));
81                 if (FlowCacheCons.Value.BYTES.get().equals(IidSflowNameUtil.resolveFlowCacheValue(flowCache.getName()))) {
82                     matchedRuleStatisticBuilder.setByteCount(Math.round(flowCacheData.getValue()));
83                 } else if (FlowCacheCons.Value.FRAMES.get().equals(IidSflowNameUtil.resolveFlowCacheValue(flowCache.getName()))) {
84                     matchedRuleStatisticBuilder.setPacketCount(Math.round(flowCacheData.getValue()));
85                 }
86
87                 Set<Pair<ConsEpgKey, ProvEpgKey>> epgsForContract = OFStatisticsManager.getEpgsForContract(contractId);
88                 Set<EpgKey> epgsFromSrcEp = getEpgsFromEndpoint(srcEpL3);
89                 Set<EpgKey> epgsFromDstEp = getEpgsFromEndpoint(dstEpL3);
90                 Pair<? extends EpgKey, ? extends EpgKey> leftSrcEpgRightDstEpg = getMatchingEpgs(epgsForContract, epgsFromSrcEp, epgsFromDstEp, flowCache.getDirection());
91                 if (leftSrcEpgRightDstEpg == null) {
92                     LOG.info("Stats are skipped for {}", flowCacheData);
93                     continue;
94                 }
95
96                 EpEpgToEpEpgStatistic epEpgToEpEpgStats = new EpEpgToEpEpgStatisticBuilder()
97                     .setSrcEpg(leftSrcEpgRightDstEpg.getLeft().getEpgId())
98                     .setDstEpg(leftSrcEpgRightDstEpg.getRight().getEpgId())
99                     .setMatchedRuleStatistic(ImmutableList.of(matchedRuleStatisticBuilder.build()))
100                     .build();
101
102                 EpToEpStatistic e2e = new EpToEpStatisticBuilder().setSrcL2c(srcEpL3.getL2Context())
103                     .setSrcMacAddress(srcEpL3.getMacAddress())
104                     .setSrcTenant(srcEpL3.getTenant())
105                     .setDstL2c(dstEpL3.getL2Context())
106                     .setDstMacAddress(dstEpL3.getMacAddress())
107                     .setDstTenant(dstEpL3.getTenant())
108                     .setEpEpgToEpEpgStatistic(ImmutableList.of(epEpgToEpEpgStats))
109                     .setTimestamp(timestamp)
110                     .build();
111
112                 StatRecords statRecords = new StatRecordsBuilder().setEpToEpStatistic(ImmutableList.of(e2e)).build();
113
114                 if (LOG.isTraceEnabled()) {
115                     LOG.trace("[sflow] writing StatRecords: {}", statRecords);
116                 }
117                 statisticsManager.writeStat(statRecords);
118             }
119         }
120     }
121
122     private Set<EpgKey> getEpgsFromEndpoint(EndpointL3 epL3) {
123         Set<EpgKey> result = new HashSet<>();
124         TenantId tenantId = epL3.getTenant();
125         if (epL3.getEndpointGroup() != null) {
126             result.add(new EpgKeyDto(epL3.getEndpointGroup(), tenantId));
127         }
128         List<EndpointGroupId> epgs = epL3.getEndpointGroups();
129         if (epgs != null) {
130             for (EndpointGroupId epg : epgs) {
131                 result.add(new EpgKeyDto(epg, tenantId));
132             }
133         }
134         return result;
135     }
136
137     private Pair<? extends EpgKey, ? extends EpgKey> getMatchingEpgs(Set<Pair<ConsEpgKey, ProvEpgKey>> epgsForContract,
138             Set<EpgKey> epgsFromSrcEp, Set<EpgKey> epgsFromDstEp, Direction direction) {
139         if (direction == null || Direction.Bidirectional == direction) {
140             LOG.info("The bidirectional direction is not supported.");
141             return null;
142         }
143         for (Pair<ConsEpgKey, ProvEpgKey> epgForContract : epgsForContract) {
144             ConsEpgKey consEpg = epgForContract.getLeft();
145             ProvEpgKey provEpg = epgForContract.getRight();
146             if (epgsFromSrcEp.contains(consEpg) && epgsFromDstEp.contains(provEpg)) {
147                 if (Direction.In.equals(direction)) {
148                     return Pair.of(consEpg, provEpg);
149                 } else if (Direction.Out.equals(direction)) {
150                     return Pair.of(provEpg, consEpg);
151                 }
152             }
153             if (epgsFromSrcEp.contains(provEpg) && epgsFromDstEp.contains(consEpg)) {
154                 if (Direction.In.equals(direction)) {
155                     return Pair.of(consEpg, provEpg);
156                 } else if (Direction.Out.equals(direction)) {
157                     return Pair.of(provEpg, consEpg);
158                 }
159             }
160         }
161         LOG.info(
162                 "EPGs of srcEP and dstEp does not match against EPGs for contract:"
163                         + "\nsrcEP EPGs: {}\ndstEP EPGs: {}\nEPGs for contract: {}",
164                 epgsFromSrcEp, epgsFromDstEp, epgsForContract);
165         return null;
166     }
167
168     private Map<String, String> createFlowCacheDataMap(FlowCacheData flowCacheData) {
169         String[] splitValues = flowCacheData.getKey().split(",");
170         if (splitValues.length != flowCache.getKeyNum()) {
171             LOG.error(
172                     "Key names and key values lists length do not match: {} != {}. Not processing.",
173                     flowCache.getKeyNum(), splitValues.length);
174             return null;
175         }
176         Map<String, String> flowCacheDataMap = new HashMap<>();
177         for (int i = 0; i < flowCache.getKeyNum(); i++) {
178             flowCacheDataMap.put(flowCache.getKeyNames()[i], splitValues[i]);
179         }
180         return flowCacheDataMap;
181     }
182
183 }