Bump odlparent->6.0.0,mdsal->5.0.3
[netvirt.git] / qosservice / impl / src / main / java / org / opendaylight / netvirt / qosservice / QosAlertManager.java
1 /*
2  * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.qosservice;
10
11 import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
12 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
13
14 import java.util.List;
15 import java.util.Map.Entry;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Future;
21 import java.util.function.Supplier;
22 import javax.annotation.PostConstruct;
23 import javax.annotation.PreDestroy;
24 import javax.inject.Inject;
25 import javax.inject.Singleton;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
28 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
29 import org.opendaylight.genius.interfacemanager.globals.IfmConstants;
30 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
31 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
32 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.qosalert.config.rev170301.QosalertConfig;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.qosalert.config.rev170301.QosalertConfigBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.ports.rev150712.ports.attributes.ports.Port;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.RpcResult;
47 import org.opendaylight.yangtools.yang.common.Uint64;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51
52 @Singleton
53 public final class QosAlertManager implements Runnable {
54     private static final Logger LOG = LoggerFactory.getLogger(QosAlertManager.class);
55
56     private volatile boolean alertEnabled;
57     private volatile int pollInterval;
58     private volatile Thread thread;
59     private volatile boolean statsPollThreadStart;
60
61     private final ManagedNewTransactionRunner txRunner;
62     private final QosalertConfig defaultConfig;
63     private final OpendaylightDirectStatisticsService odlDirectStatisticsService;
64     private final QosNeutronUtils qosNeutronUtils;
65     private final QosEosHandler qosEosHandler;
66     private final IInterfaceManager interfaceManager;
67     private final Set unprocessedInterfaceIds = ConcurrentHashMap.newKeySet();
68     private final ConcurrentMap<Uint64, ConcurrentMap<String, QosAlertPortData>> qosAlertDpnPortNumberMap =
69             new ConcurrentHashMap<>();
70     private final AlertThresholdSupplier alertThresholdSupplier = new AlertThresholdSupplier();
71
72     @Inject
73     public QosAlertManager(final DataBroker dataBroker,
74             final OpendaylightDirectStatisticsService odlDirectStatisticsService, final QosalertConfig defaultConfig,
75             final QosNeutronUtils qosNeutronUtils, final QosEosHandler qosEosHandler,
76             final IInterfaceManager interfaceManager) {
77         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
78         this.odlDirectStatisticsService = odlDirectStatisticsService;
79         this.interfaceManager = interfaceManager;
80         this.defaultConfig = defaultConfig;
81         this.qosNeutronUtils = qosNeutronUtils;
82         this.qosEosHandler = qosEosHandler;
83         LOG.trace("QosAlert default config poll alertEnabled:{} threshold:{} pollInterval:{}",
84                 defaultConfig.isQosAlertEnabled(), defaultConfig.getQosDropPacketThreshold(),
85                 defaultConfig.getQosAlertPollInterval());
86         getDefaultConfig();
87     }
88
89     @PostConstruct
90     public void init() {
91         qosEosHandler.addLocalOwnershipChangedListener(this::setQosAlertOwner);
92         qosAlertDpnPortNumberMap.clear();
93         statsPollThreadStart = true;
94         startStatsPollThread();
95         LOG.trace("{} init done", getClass().getSimpleName());
96     }
97
98     @PreDestroy
99     public void close() {
100         statsPollThreadStart = false;
101         if (thread != null) {
102             thread.interrupt();
103         }
104         LOG.trace("{} close done", getClass().getSimpleName());
105     }
106
107     private void setQosAlertOwner(boolean isOwner) {
108         LOG.trace("qos alert set owner : {}", isOwner);
109         statsPollThreadStart = isOwner;
110         if (thread != null) {
111             thread.interrupt();
112         } else {
113             startStatsPollThread();
114         }
115     }
116
117     @Override
118     public void run() {
119         LOG.debug("Qos alert poll thread started");
120         while (statsPollThreadStart && alertEnabled) {
121             LOG.trace("Thread loop polling :{} threshold:{} pollInterval:{}",
122                     alertEnabled, alertThresholdSupplier.get(), pollInterval);
123
124             try {
125                 pollDirectStatisticsForAllNodes();
126                 Thread.sleep(pollInterval * 60L * 1000L); // pollInterval in minutes
127             } catch (final InterruptedException e) {
128                 LOG.debug("Qos polling thread interrupted");
129             }
130         }
131         thread = null;
132         LOG.debug("Qos alert poll thread stopped");
133     }
134
135     private void startStatsPollThread() {
136         if (statsPollThreadStart && alertEnabled && thread == null) {
137             initPortStatsData();
138             thread = new Thread(this);
139             thread.setDaemon(true);
140             thread.start();
141         }
142     }
143
144     private void getDefaultConfig() {
145         alertEnabled = defaultConfig.isQosAlertEnabled();
146         pollInterval = defaultConfig.getQosAlertPollInterval().toJava();
147
148         alertThresholdSupplier.set(defaultConfig.getQosDropPacketThreshold().toJava());
149     }
150
151     public void setQosalertConfig(QosalertConfig config) {
152
153         LOG.debug("New QoS alert config threshold:{} polling alertEnabled:{} interval:{}",
154                 config.getQosDropPacketThreshold(), config.isQosAlertEnabled(),
155                 config.getQosAlertPollInterval());
156
157         alertEnabled = config.isQosAlertEnabled().booleanValue();
158         pollInterval = config.getQosAlertPollInterval().toJava();
159
160         alertThresholdSupplier.set(config.getQosDropPacketThreshold().shortValue());
161
162         if (thread != null) {
163             thread.interrupt();
164         } else {
165             startStatsPollThread();
166         }
167
168     }
169
170     public void restoreDefaultConfig() {
171         LOG.debug("Restoring default configuration");
172         getDefaultConfig();
173         if (thread != null) {
174             thread.interrupt();
175         } else {
176             startStatsPollThread();
177         }
178     }
179
180     public void setThreshold(short threshold) {
181         LOG.debug("setting threshold {} in config data store", threshold);
182         writeConfigDataStore(alertEnabled, threshold, pollInterval);
183     }
184
185     public void setPollInterval(int pollInterval) {
186         LOG.debug("setting interval {} in config data store", pollInterval);
187         writeConfigDataStore(alertEnabled, alertThresholdSupplier.get().shortValue(), pollInterval);
188     }
189
190     public void setEnable(boolean enable) {
191         LOG.debug("setting QoS poll to {} in config data store", enable);
192         writeConfigDataStore(enable, alertThresholdSupplier.get().shortValue(), pollInterval);
193     }
194
195     public void addInterfaceIdInQoSAlertCache(String ifaceId) {
196         LOG.trace("Adding interface id {} in cache", ifaceId);
197         InterfaceInfo interfaceInfo =
198                 interfaceManager.getInterfaceInfoFromOperationalDataStore(ifaceId);
199         if (interfaceInfo == null) {
200             LOG.debug("Interface not found {}. Added in cache now to process later ", ifaceId);
201             unprocessedInterfaceIds.add(ifaceId);
202         } else {
203             addToQosAlertCache(interfaceInfo);
204         }
205     }
206
207     public void processInterfaceUpEvent(String ifaceId) {
208         LOG.trace("processInterfaceUpEvent {}", ifaceId);
209         if (unprocessedInterfaceIds.remove(ifaceId)) {
210             addInterfaceIdInQoSAlertCache(ifaceId);
211         }
212     }
213
214     private void addToQosAlertCache(InterfaceInfo interfaceInfo) {
215         Uint64 dpnId = interfaceInfo.getDpId();
216         if (dpnId.equals(Uint64.valueOf(0L))) {
217             LOG.warn("Interface {} could not be added to Qos Alert Cache because Dpn Id is not found",
218                     interfaceInfo.getInterfaceName());
219             return;
220         }
221
222         Port port = qosNeutronUtils.getNeutronPort(interfaceInfo.getInterfaceName());
223         if (port == null) {
224             LOG.warn("Port {} not added to Qos Alert Cache because it is not found", interfaceInfo.getInterfaceName());
225             return;
226         }
227
228         String portNumber = String.valueOf(interfaceInfo.getPortNo());
229
230         LOG.trace("Adding DPN ID {} with port {} port number {}", dpnId, port.getUuid(), portNumber);
231
232         qosAlertDpnPortNumberMap.computeIfAbsent(dpnId, key -> new ConcurrentHashMap<>())
233                 .put(portNumber, new QosAlertPortData(port, qosNeutronUtils, alertThresholdSupplier));
234     }
235
236     public void removeInterfaceIdFromQosAlertCache(String ifaceId) {
237
238         LOG.trace("If present, remove interface {} from cache", ifaceId);
239         unprocessedInterfaceIds.remove(ifaceId);
240         InterfaceInfo interfaceInfo =
241                 interfaceManager.getInterfaceInfoFromOperationalDataStore(ifaceId);
242         if (interfaceInfo == null) {
243             return;
244         }
245         Uint64 dpnId = interfaceInfo.getDpId();
246         String portNumber = String.valueOf(interfaceInfo.getPortNo());
247         removeFromQosAlertCache(dpnId, portNumber);
248     }
249
250     public void removeLowerLayerIfFromQosAlertCache(String lowerLayerIf) {
251         LOG.trace("If present, remove lowerLayerIf {} from cache", lowerLayerIf);
252         Uint64 dpnId = qosNeutronUtils.getDpnIdFromLowerLayerIf(lowerLayerIf);
253         String portNumber = qosNeutronUtils.getPortNumberFromLowerLayerIf(lowerLayerIf);
254         if (dpnId == null || portNumber == null) {
255             LOG.warn("Interface {} not in openflow:dpnid:portnum format, could not remove from cache", lowerLayerIf);
256             return;
257         }
258         removeFromQosAlertCache(dpnId, portNumber);
259     }
260
261     private void removeFromQosAlertCache(Uint64 dpnId, String portNumber) {
262         if (qosAlertDpnPortNumberMap.containsKey(dpnId)
263                 && qosAlertDpnPortNumberMap.get(dpnId).containsKey(portNumber)) {
264             qosAlertDpnPortNumberMap.get(dpnId).remove(portNumber);
265             LOG.trace("Removed interace {}:{} from cache", dpnId, portNumber);
266             if (qosAlertDpnPortNumberMap.get(dpnId).isEmpty()) {
267                 LOG.trace("DPN {} empty. Removing dpn from cache", dpnId);
268                 qosAlertDpnPortNumberMap.remove(dpnId);
269             }
270         }
271     }
272
273     private void writeConfigDataStore(boolean qosAlertEnabled, short dropPacketThreshold, int alertPollInterval) {
274
275         InstanceIdentifier<QosalertConfig> path = InstanceIdentifier.builder(QosalertConfig.class).build();
276
277         QosalertConfig qosAlertConfig = new QosalertConfigBuilder()
278                 .setQosDropPacketThreshold(dropPacketThreshold)
279                 .setQosAlertEnabled(qosAlertEnabled)
280                 .setQosAlertPollInterval(alertPollInterval)
281                 .build();
282
283         ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
284             tx -> tx.put(path, qosAlertConfig,
285                     CREATE_MISSING_PARENTS)), LOG, "Error writing to the config data store");
286     }
287
288     private void pollDirectStatisticsForAllNodes() {
289         LOG.trace("Polling direct statistics from nodes");
290
291         for (Entry<Uint64, ConcurrentMap<String, QosAlertPortData>> entry : qosAlertDpnPortNumberMap.entrySet()) {
292             Uint64 dpn = entry.getKey();
293             LOG.trace("Polling DPN ID {}", dpn);
294             GetNodeConnectorStatisticsInputBuilder input = new GetNodeConnectorStatisticsInputBuilder()
295                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
296                             .child(Node.class, new NodeKey(new NodeId(IfmConstants.OF_URI_PREFIX + dpn))).build()))
297                     .setStoreStats(false);
298             Future<RpcResult<GetNodeConnectorStatisticsOutput>> rpcResultFuture =
299                     odlDirectStatisticsService.getNodeConnectorStatistics(input.build());
300
301             RpcResult<GetNodeConnectorStatisticsOutput> rpcResult = null;
302             try {
303                 rpcResult = rpcResultFuture.get();
304             } catch (InterruptedException | ExecutionException e) {
305                 if (LOG.isDebugEnabled()) {
306                     LOG.debug("Could not get Direct-Statistics for node {} Exception occurred ", dpn, e);
307                 } else {
308                     LOG.info("Could not get Direct-Statistics for node {}", dpn);
309                 }
310             }
311             if (rpcResult != null && rpcResult.isSuccessful() && rpcResult.getResult() != null) {
312
313                 GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput = rpcResult.getResult();
314
315                 List<NodeConnectorStatisticsAndPortNumberMap> nodeConnectorStatisticsAndPortNumberMapList =
316                         nodeConnectorStatisticsOutput.getNodeConnectorStatisticsAndPortNumberMap();
317
318                 ConcurrentMap<String, QosAlertPortData> portDataMap = entry.getValue();
319                 for (NodeConnectorStatisticsAndPortNumberMap stats : nodeConnectorStatisticsAndPortNumberMapList) {
320                     QosAlertPortData portData = portDataMap.get(stats.getNodeConnectorId().getValue());
321                     if (portData != null) {
322                         portData.updatePortStatistics(stats);
323                     }
324                 }
325             } else {
326                 LOG.info("Direct-Statistics not available for node {}", dpn);
327             }
328
329         }
330     }
331
332     private void initPortStatsData() {
333         qosAlertDpnPortNumberMap.values().forEach(portDataMap -> portDataMap.values()
334                 .forEach(QosAlertPortData::initPortData));
335     }
336
337     private static class AlertThresholdSupplier implements Supplier<Uint64> {
338         private volatile Uint64 alertThreshold = Uint64.valueOf(0);
339
340         void set(short threshold) {
341             alertThreshold = Uint64.valueOf(threshold);
342         }
343
344         @Override
345         public Uint64 get() {
346             return alertThreshold;
347         }
348     }
349 }