2 * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netvirt.qosservice;
11 import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
12 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
14 import java.util.List;
15 import java.util.Map.Entry;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Future;
21 import java.util.function.Supplier;
22 import javax.annotation.PostConstruct;
23 import javax.annotation.PreDestroy;
24 import javax.inject.Inject;
25 import javax.inject.Singleton;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
28 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
29 import org.opendaylight.genius.interfacemanager.globals.IfmConstants;
30 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
31 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
32 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.qosalert.config.rev170301.QosalertConfig;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.qosalert.config.rev170301.QosalertConfigBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.ports.rev150712.ports.attributes.ports.Port;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.opendaylight.yangtools.yang.common.Uint64;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
53 public final class QosAlertManager implements Runnable {
54 private static final Logger LOG = LoggerFactory.getLogger(QosAlertManager.class);
56 private volatile boolean alertEnabled;
57 private volatile int pollInterval;
58 private volatile Thread thread;
59 private volatile boolean statsPollThreadStart;
61 private final ManagedNewTransactionRunner txRunner;
62 private final QosalertConfig defaultConfig;
63 private final OpendaylightDirectStatisticsService odlDirectStatisticsService;
64 private final QosNeutronUtils qosNeutronUtils;
65 private final QosEosHandler qosEosHandler;
66 private final IInterfaceManager interfaceManager;
67 private final Set unprocessedInterfaceIds = ConcurrentHashMap.newKeySet();
68 private final ConcurrentMap<Uint64, ConcurrentMap<String, QosAlertPortData>> qosAlertDpnPortNumberMap =
69 new ConcurrentHashMap<>();
70 private final AlertThresholdSupplier alertThresholdSupplier = new AlertThresholdSupplier();
73 public QosAlertManager(final DataBroker dataBroker,
74 final OpendaylightDirectStatisticsService odlDirectStatisticsService, final QosalertConfig defaultConfig,
75 final QosNeutronUtils qosNeutronUtils, final QosEosHandler qosEosHandler,
76 final IInterfaceManager interfaceManager) {
77 this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
78 this.odlDirectStatisticsService = odlDirectStatisticsService;
79 this.interfaceManager = interfaceManager;
80 this.defaultConfig = defaultConfig;
81 this.qosNeutronUtils = qosNeutronUtils;
82 this.qosEosHandler = qosEosHandler;
83 LOG.trace("QosAlert default config poll alertEnabled:{} threshold:{} pollInterval:{}",
84 defaultConfig.isQosAlertEnabled(), defaultConfig.getQosDropPacketThreshold(),
85 defaultConfig.getQosAlertPollInterval());
91 qosEosHandler.addLocalOwnershipChangedListener(this::setQosAlertOwner);
92 qosAlertDpnPortNumberMap.clear();
93 statsPollThreadStart = true;
94 startStatsPollThread();
95 LOG.trace("{} init done", getClass().getSimpleName());
100 statsPollThreadStart = false;
101 if (thread != null) {
104 LOG.trace("{} close done", getClass().getSimpleName());
107 private void setQosAlertOwner(boolean isOwner) {
108 LOG.trace("qos alert set owner : {}", isOwner);
109 statsPollThreadStart = isOwner;
110 if (thread != null) {
113 startStatsPollThread();
119 LOG.debug("Qos alert poll thread started");
120 while (statsPollThreadStart && alertEnabled) {
121 LOG.trace("Thread loop polling :{} threshold:{} pollInterval:{}",
122 alertEnabled, alertThresholdSupplier.get(), pollInterval);
125 pollDirectStatisticsForAllNodes();
126 Thread.sleep(pollInterval * 60L * 1000L); // pollInterval in minutes
127 } catch (final InterruptedException e) {
128 LOG.debug("Qos polling thread interrupted");
132 LOG.debug("Qos alert poll thread stopped");
135 private void startStatsPollThread() {
136 if (statsPollThreadStart && alertEnabled && thread == null) {
138 thread = new Thread(this);
139 thread.setDaemon(true);
144 private void getDefaultConfig() {
145 alertEnabled = defaultConfig.isQosAlertEnabled();
146 pollInterval = defaultConfig.getQosAlertPollInterval().toJava();
148 alertThresholdSupplier.set(defaultConfig.getQosDropPacketThreshold().toJava());
151 public void setQosalertConfig(QosalertConfig config) {
153 LOG.debug("New QoS alert config threshold:{} polling alertEnabled:{} interval:{}",
154 config.getQosDropPacketThreshold(), config.isQosAlertEnabled(),
155 config.getQosAlertPollInterval());
157 alertEnabled = config.isQosAlertEnabled().booleanValue();
158 pollInterval = config.getQosAlertPollInterval().toJava();
160 alertThresholdSupplier.set(config.getQosDropPacketThreshold().shortValue());
162 if (thread != null) {
165 startStatsPollThread();
170 public void restoreDefaultConfig() {
171 LOG.debug("Restoring default configuration");
173 if (thread != null) {
176 startStatsPollThread();
180 public void setThreshold(short threshold) {
181 LOG.debug("setting threshold {} in config data store", threshold);
182 writeConfigDataStore(alertEnabled, threshold, pollInterval);
185 public void setPollInterval(int pollInterval) {
186 LOG.debug("setting interval {} in config data store", pollInterval);
187 writeConfigDataStore(alertEnabled, alertThresholdSupplier.get().shortValue(), pollInterval);
190 public void setEnable(boolean enable) {
191 LOG.debug("setting QoS poll to {} in config data store", enable);
192 writeConfigDataStore(enable, alertThresholdSupplier.get().shortValue(), pollInterval);
195 public void addInterfaceIdInQoSAlertCache(String ifaceId) {
196 LOG.trace("Adding interface id {} in cache", ifaceId);
197 InterfaceInfo interfaceInfo =
198 interfaceManager.getInterfaceInfoFromOperationalDataStore(ifaceId);
199 if (interfaceInfo == null) {
200 LOG.debug("Interface not found {}. Added in cache now to process later ", ifaceId);
201 unprocessedInterfaceIds.add(ifaceId);
203 addToQosAlertCache(interfaceInfo);
207 public void processInterfaceUpEvent(String ifaceId) {
208 LOG.trace("processInterfaceUpEvent {}", ifaceId);
209 if (unprocessedInterfaceIds.remove(ifaceId)) {
210 addInterfaceIdInQoSAlertCache(ifaceId);
214 private void addToQosAlertCache(InterfaceInfo interfaceInfo) {
215 Uint64 dpnId = interfaceInfo.getDpId();
216 if (dpnId.equals(Uint64.valueOf(0L))) {
217 LOG.warn("Interface {} could not be added to Qos Alert Cache because Dpn Id is not found",
218 interfaceInfo.getInterfaceName());
222 Port port = qosNeutronUtils.getNeutronPort(interfaceInfo.getInterfaceName());
224 LOG.warn("Port {} not added to Qos Alert Cache because it is not found", interfaceInfo.getInterfaceName());
228 String portNumber = String.valueOf(interfaceInfo.getPortNo());
230 LOG.trace("Adding DPN ID {} with port {} port number {}", dpnId, port.getUuid(), portNumber);
232 qosAlertDpnPortNumberMap.computeIfAbsent(dpnId, key -> new ConcurrentHashMap<>())
233 .put(portNumber, new QosAlertPortData(port, qosNeutronUtils, alertThresholdSupplier));
236 public void removeInterfaceIdFromQosAlertCache(String ifaceId) {
238 LOG.trace("If present, remove interface {} from cache", ifaceId);
239 unprocessedInterfaceIds.remove(ifaceId);
240 InterfaceInfo interfaceInfo =
241 interfaceManager.getInterfaceInfoFromOperationalDataStore(ifaceId);
242 if (interfaceInfo == null) {
245 Uint64 dpnId = interfaceInfo.getDpId();
246 String portNumber = String.valueOf(interfaceInfo.getPortNo());
247 removeFromQosAlertCache(dpnId, portNumber);
250 public void removeLowerLayerIfFromQosAlertCache(String lowerLayerIf) {
251 LOG.trace("If present, remove lowerLayerIf {} from cache", lowerLayerIf);
252 Uint64 dpnId = qosNeutronUtils.getDpnIdFromLowerLayerIf(lowerLayerIf);
253 String portNumber = qosNeutronUtils.getPortNumberFromLowerLayerIf(lowerLayerIf);
254 if (dpnId == null || portNumber == null) {
255 LOG.warn("Interface {} not in openflow:dpnid:portnum format, could not remove from cache", lowerLayerIf);
258 removeFromQosAlertCache(dpnId, portNumber);
261 private void removeFromQosAlertCache(Uint64 dpnId, String portNumber) {
262 if (qosAlertDpnPortNumberMap.containsKey(dpnId)
263 && qosAlertDpnPortNumberMap.get(dpnId).containsKey(portNumber)) {
264 qosAlertDpnPortNumberMap.get(dpnId).remove(portNumber);
265 LOG.trace("Removed interace {}:{} from cache", dpnId, portNumber);
266 if (qosAlertDpnPortNumberMap.get(dpnId).isEmpty()) {
267 LOG.trace("DPN {} empty. Removing dpn from cache", dpnId);
268 qosAlertDpnPortNumberMap.remove(dpnId);
273 private void writeConfigDataStore(boolean qosAlertEnabled, short dropPacketThreshold, int alertPollInterval) {
275 InstanceIdentifier<QosalertConfig> path = InstanceIdentifier.builder(QosalertConfig.class).build();
277 QosalertConfig qosAlertConfig = new QosalertConfigBuilder()
278 .setQosDropPacketThreshold(dropPacketThreshold)
279 .setQosAlertEnabled(qosAlertEnabled)
280 .setQosAlertPollInterval(alertPollInterval)
283 ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
284 tx -> tx.put(path, qosAlertConfig,
285 CREATE_MISSING_PARENTS)), LOG, "Error writing to the config data store");
288 private void pollDirectStatisticsForAllNodes() {
289 LOG.trace("Polling direct statistics from nodes");
291 for (Entry<Uint64, ConcurrentMap<String, QosAlertPortData>> entry : qosAlertDpnPortNumberMap.entrySet()) {
292 Uint64 dpn = entry.getKey();
293 LOG.trace("Polling DPN ID {}", dpn);
294 GetNodeConnectorStatisticsInputBuilder input = new GetNodeConnectorStatisticsInputBuilder()
295 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
296 .child(Node.class, new NodeKey(new NodeId(IfmConstants.OF_URI_PREFIX + dpn))).build()))
297 .setStoreStats(false);
298 Future<RpcResult<GetNodeConnectorStatisticsOutput>> rpcResultFuture =
299 odlDirectStatisticsService.getNodeConnectorStatistics(input.build());
301 RpcResult<GetNodeConnectorStatisticsOutput> rpcResult = null;
303 rpcResult = rpcResultFuture.get();
304 } catch (InterruptedException | ExecutionException e) {
305 if (LOG.isDebugEnabled()) {
306 LOG.debug("Could not get Direct-Statistics for node {} Exception occurred ", dpn, e);
308 LOG.info("Could not get Direct-Statistics for node {}", dpn);
311 if (rpcResult != null && rpcResult.isSuccessful() && rpcResult.getResult() != null) {
313 GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput = rpcResult.getResult();
315 List<NodeConnectorStatisticsAndPortNumberMap> nodeConnectorStatisticsAndPortNumberMapList =
316 nodeConnectorStatisticsOutput.getNodeConnectorStatisticsAndPortNumberMap();
318 ConcurrentMap<String, QosAlertPortData> portDataMap = entry.getValue();
319 for (NodeConnectorStatisticsAndPortNumberMap stats : nodeConnectorStatisticsAndPortNumberMapList) {
320 QosAlertPortData portData = portDataMap.get(stats.getNodeConnectorId().getValue());
321 if (portData != null) {
322 portData.updatePortStatistics(stats);
326 LOG.info("Direct-Statistics not available for node {}", dpn);
332 private void initPortStatsData() {
333 qosAlertDpnPortNumberMap.values().forEach(portDataMap -> portDataMap.values()
334 .forEach(QosAlertPortData::initPortData));
337 private static class AlertThresholdSupplier implements Supplier<Uint64> {
338 private volatile Uint64 alertThreshold = Uint64.valueOf(0);
340 void set(short threshold) {
341 alertThreshold = Uint64.valueOf(threshold);
345 public Uint64 get() {
346 return alertThreshold;