2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics;
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.Iterator;
14 import java.util.List;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
22 import javax.annotation.Nullable;
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.opendaylight.groupbasedpolicy.api.StatisticsManager;
26 import org.opendaylight.groupbasedpolicy.dto.ConsEpgKey;
27 import org.opendaylight.groupbasedpolicy.dto.EpgKeyDto;
28 import org.opendaylight.groupbasedpolicy.dto.ProvEpgKey;
29 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.flowcache.FlowCache;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.flowcache.FlowCacheFactory;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.util.FlowCacheCons;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.util.IidSflowNameUtil;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ContractId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.has.classifiers.Classifier;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicy;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicyKey;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 import com.google.common.base.Preconditions;
44 import com.google.common.collect.HashMultimap;
45 import com.google.common.collect.SetMultimap;
47 public class OFStatisticsManager implements AutoCloseable {
49 // key is String (not a full IpAddress) because
50 // we will get String from REST query to sFlow
51 private static ConcurrentMap<String, EndpointL3> endpointL3ByIpMap = new ConcurrentHashMap<>();
52 private static final int CONNECT_TIMEOUT_MILLISEC = 20000;
53 private static final int READ_TIMEOUT_MILLISEC = 30000;
55 private static final Logger LOG = LoggerFactory.getLogger(OFStatisticsManager.class);
57 private final ScheduledExecutorService executor;
58 private final StatisticsManager statisticsManager;
59 private final Set<String> flowCacheNames = new HashSet<>();
60 private static final SetMultimap<ContractId, Pair<ConsEpgKey, ProvEpgKey>> epgsByContractId = HashMultimap.create();
61 private List<ScheduledFuture<?>> collectStatsTasks = new ArrayList<>();
63 private static int MAX_FLOWS = 100;
64 private static double MIN_VALUE_IN_FLOW = 0.1;
65 private static final String AGG_MODE = "sum";
67 private String sflowCollectorUri;
69 public OFStatisticsManager(ScheduledExecutorService executor, StatisticsManager statisticsManager) {
70 this.executor = executor;
71 this.statisticsManager = statisticsManager;
74 public synchronized void pullStatsForClassifier(InstanceIdentifier<Classifier> classifierIid,
75 Classifier classifier) {
76 Preconditions.checkNotNull(sflowCollectorUri);
77 Preconditions.checkNotNull(delay);
78 FlowCache flowCache = FlowCacheFactory.createFlowCache(classifierIid, classifier, FlowCacheCons.Value.BYTES);
79 setStatsPulling(flowCache, classifierIid);
80 flowCache = FlowCacheFactory.createFlowCache(classifierIid, classifier, FlowCacheCons.Value.FRAMES);
81 setStatsPulling(flowCache, classifierIid);
84 private void setStatsPulling(FlowCache flowCache, InstanceIdentifier<Classifier> classifierIid) {
85 if (flowCache == null) {
86 LOG.trace("Flow cache is null for classifier {}", classifierIid);
89 ResolvedPolicyKey resolvedPolicyKey = classifierIid.firstKeyOf(ResolvedPolicy.class);
90 ConsEpgKey consEpgKey =
91 new EpgKeyDto(resolvedPolicyKey.getConsumerEpgId(), resolvedPolicyKey.getConsumerTenantId());
92 ProvEpgKey provEpgKey =
93 new EpgKeyDto(resolvedPolicyKey.getProviderEpgId(), resolvedPolicyKey.getProviderTenantId());
94 String flowCacheName = flowCache.getName();
95 ContractId contractId = IidSflowNameUtil.resolveContractIdFromFlowCacheName(flowCacheName);
96 epgsByContractId.put(contractId, Pair.of(consEpgKey, provEpgKey));
97 boolean isFlowCacheNew = flowCacheNames.add(flowCacheName);
99 SFlowRTConnection sFlowRTConnection = new SFlowRTConnection(executor, sflowCollectorUri, flowCache, new JsonRestClient(sflowCollectorUri, CONNECT_TIMEOUT_MILLISEC,
100 READ_TIMEOUT_MILLISEC));
101 ScheduledFuture<?> collectStatsTask = this.executor.scheduleWithFixedDelay(new ReadGbpFlowCacheTask(flowCacheName, sFlowRTConnection,
102 statisticsManager, MAX_FLOWS, MIN_VALUE_IN_FLOW, AGG_MODE), 0, delay, TimeUnit.SECONDS);
103 collectStatsTasks.add(collectStatsTask);
107 public synchronized static Set<Pair<ConsEpgKey, ProvEpgKey>> getEpgsForContract(ContractId contractId) {
108 return epgsByContractId.get(contractId);
111 public synchronized void setSflowCollectorUri(String sflowCollectorUri) {
112 this.sflowCollectorUri = sflowCollectorUri;
115 public synchronized void setDelay(Long delay) {
119 public static EndpointL3 getEndpointL3ForIp(@Nullable String ipAddress) {
120 if (ipAddress == null) {
123 return endpointL3ByIpMap.get(ipAddress);
126 public static void addL3Endpoint(EndpointL3 endpointL3) {
127 endpointL3ByIpMap.put(getStringIpAddress(endpointL3.getIpAddress()), endpointL3);
130 public static void removeL3Endpoint(EndpointL3 endpointL3) {
131 endpointL3ByIpMap.remove(getStringIpAddress(endpointL3.getIpAddress()));
134 private static String getStringIpAddress(IpAddress ipAddress) {
135 if (ipAddress.getIpv4Address() != null) {
136 return ipAddress.getIpv4Address().getValue();
138 return ipAddress.getIpv6Address().getValue();
142 public synchronized void close() throws Exception {
143 Iterator<ScheduledFuture<?>> tasksIterator = collectStatsTasks.iterator();
144 while (tasksIterator.hasNext()) {
145 ScheduledFuture<?> scheduledFuture = tasksIterator.next();
146 scheduledFuture.cancel(false);
147 tasksIterator.remove();
149 epgsByContractId.clear();