Unit tests for ofoverlay
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / statistics / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics;
10
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21
22 import javax.annotation.Nullable;
23
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.opendaylight.groupbasedpolicy.api.StatisticsManager;
26 import org.opendaylight.groupbasedpolicy.dto.ConsEpgKey;
27 import org.opendaylight.groupbasedpolicy.dto.EpgKeyDto;
28 import org.opendaylight.groupbasedpolicy.dto.ProvEpgKey;
29 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.flowcache.FlowCache;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.flowcache.FlowCacheFactory;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.util.FlowCacheCons;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.statistics.util.IidSflowNameUtil;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ContractId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.has.classifiers.Classifier;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicy;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicyKey;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.base.Preconditions;
44 import com.google.common.collect.HashMultimap;
45 import com.google.common.collect.SetMultimap;
46
47 public class OFStatisticsManager implements AutoCloseable {
48
49     // key is String (not a full IpAddress) because
50     // we will get String from REST query to sFlow
51     private static ConcurrentMap<String, EndpointL3> endpointL3ByIpMap = new ConcurrentHashMap<>();
52     private static final int CONNECT_TIMEOUT_MILLISEC = 20000;
53     private static final int READ_TIMEOUT_MILLISEC = 30000;
54
55     private static final Logger LOG = LoggerFactory.getLogger(OFStatisticsManager.class);
56
57     private final ScheduledExecutorService executor;
58     private final StatisticsManager statisticsManager;
59     private final Set<String> flowCacheNames = new HashSet<>();
60     private static final SetMultimap<ContractId, Pair<ConsEpgKey, ProvEpgKey>> epgsByContractId = HashMultimap.create();
61     private List<ScheduledFuture<?>> collectStatsTasks = new ArrayList<>();
62
63     private static int MAX_FLOWS = 100;
64     private static double MIN_VALUE_IN_FLOW = 0.1;
65     private static final String AGG_MODE = "sum";
66     private Long delay;
67     private String sflowCollectorUri;
68
69     public OFStatisticsManager(ScheduledExecutorService executor, StatisticsManager statisticsManager) {
70         this.executor = executor;
71         this.statisticsManager = statisticsManager;
72     }
73
74     public synchronized void pullStatsForClassifier(InstanceIdentifier<Classifier> classifierIid,
75             Classifier classifier) {
76         Preconditions.checkNotNull(sflowCollectorUri);
77         Preconditions.checkNotNull(delay);
78         FlowCache flowCache = FlowCacheFactory.createFlowCache(classifierIid, classifier, FlowCacheCons.Value.BYTES);
79         setStatsPulling(flowCache, classifierIid);
80         flowCache = FlowCacheFactory.createFlowCache(classifierIid, classifier, FlowCacheCons.Value.FRAMES);
81         setStatsPulling(flowCache, classifierIid);
82     }
83
84     private void setStatsPulling(FlowCache flowCache, InstanceIdentifier<Classifier> classifierIid) {
85         if (flowCache == null) {
86             LOG.trace("Flow cache is null for classifier {}", classifierIid);
87             return;
88         }
89         ResolvedPolicyKey resolvedPolicyKey = classifierIid.firstKeyOf(ResolvedPolicy.class);
90         ConsEpgKey consEpgKey =
91                 new EpgKeyDto(resolvedPolicyKey.getConsumerEpgId(), resolvedPolicyKey.getConsumerTenantId());
92         ProvEpgKey provEpgKey =
93                 new EpgKeyDto(resolvedPolicyKey.getProviderEpgId(), resolvedPolicyKey.getProviderTenantId());
94         String flowCacheName = flowCache.getName();
95         ContractId contractId = IidSflowNameUtil.resolveContractIdFromFlowCacheName(flowCacheName);
96         epgsByContractId.put(contractId, Pair.of(consEpgKey, provEpgKey));
97         boolean isFlowCacheNew = flowCacheNames.add(flowCacheName);
98         if (isFlowCacheNew) {
99             SFlowRTConnection sFlowRTConnection = new SFlowRTConnection(executor, sflowCollectorUri, flowCache, new JsonRestClient(sflowCollectorUri, CONNECT_TIMEOUT_MILLISEC,
100                     READ_TIMEOUT_MILLISEC));
101             ScheduledFuture<?> collectStatsTask = this.executor.scheduleWithFixedDelay(new ReadGbpFlowCacheTask(flowCacheName, sFlowRTConnection,
102                     statisticsManager, MAX_FLOWS, MIN_VALUE_IN_FLOW, AGG_MODE), 0, delay, TimeUnit.SECONDS);
103             collectStatsTasks.add(collectStatsTask);
104         }
105     }
106
107     public synchronized static Set<Pair<ConsEpgKey, ProvEpgKey>> getEpgsForContract(ContractId contractId) {
108         return epgsByContractId.get(contractId);
109     }
110
111     public synchronized void setSflowCollectorUri(String sflowCollectorUri) {
112         this.sflowCollectorUri = sflowCollectorUri;
113     }
114
115     public synchronized void setDelay(Long delay) {
116         this.delay = delay;
117     }
118
119     public static EndpointL3 getEndpointL3ForIp(@Nullable String ipAddress) {
120         if (ipAddress == null) {
121             return null;
122         }
123         return endpointL3ByIpMap.get(ipAddress);
124     }
125
126     public static void addL3Endpoint(EndpointL3 endpointL3) {
127         endpointL3ByIpMap.put(getStringIpAddress(endpointL3.getIpAddress()), endpointL3);
128     }
129
130     public static void removeL3Endpoint(EndpointL3 endpointL3) {
131         endpointL3ByIpMap.remove(getStringIpAddress(endpointL3.getIpAddress()));
132     }
133
134     private static String getStringIpAddress(IpAddress ipAddress) {
135         if (ipAddress.getIpv4Address() != null) {
136             return ipAddress.getIpv4Address().getValue();
137         }
138         return ipAddress.getIpv6Address().getValue();
139     }
140
141     @Override
142     public synchronized void close() throws Exception {
143         Iterator<ScheduledFuture<?>> tasksIterator = collectStatsTasks.iterator();
144         while (tasksIterator.hasNext()) {
145             ScheduledFuture<?> scheduledFuture = tasksIterator.next();
146             scheduledFuture.cancel(false);
147             tasksIterator.remove();
148         }
149         epgsByContractId.clear();
150     }
151
152 }