Handle nullable lists
[genius.git] / interfacemanager / interfacemanager-impl / src / main / java / org / opendaylight / genius / interfacemanager / pmcounters / NodeConnectorStatsImpl.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.interfacemanager.pmcounters;
9
10 import static org.opendaylight.genius.interfacemanager.IfmUtil.nullToEmpty;
11 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool;
12
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import javax.annotation.Nonnull;
28 import javax.annotation.PreDestroy;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
34 import org.opendaylight.genius.interfacemanager.IfmConstants;
35 import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache;
36 import org.opendaylight.genius.interfacemanager.listeners.PortNameCache;
37 import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
38 import org.opendaylight.infrautils.metrics.Counter;
39 import org.opendaylight.infrautils.metrics.Labeled;
40 import org.opendaylight.infrautils.metrics.MetricDescriptor;
41 import org.opendaylight.infrautils.metrics.MetricProvider;
42 import org.opendaylight.infrautils.utils.UncheckedCloseable;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.opendaylight.yangtools.yang.common.RpcResult;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 @Singleton
66 public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListenerBase<Node, NodeConnectorStatsImpl> {
67
68     private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
69
70     private static final int THREAD_POOL_SIZE = 4;
71     private final Set<String> nodes = ConcurrentHashMap.newKeySet();
72     private final Map<String, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
73     private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
74     private final ScheduledExecutorService portStatExecutorService;
75     private final EntityOwnershipUtils entityOwnershipUtils;
76     private final PortNameCache portNameCache;
77     private final InterfaceChildCache interfaceChildCache;
78     private final IfmConfig ifmConfig;
79     private final MetricProvider metricProvider;
80
81     private volatile int delayStatsQuery;
82     private ScheduledFuture<?> scheduledResult;
83
84     @Inject
85     public NodeConnectorStatsImpl(DataBroker dataBroker,
86                                   final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService,
87                                   final EntityOwnershipUtils entityOwnershipUtils,
88                                   final PortNameCache portNameCache,
89                                   final InterfaceChildCache interfaceChildCache,
90                                   final IfmConfig ifmConfigObj,
91                                   final MetricProvider metricProvider) {
92         super(Node.class, NodeConnectorStatsImpl.class);
93         this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService;
94         this.entityOwnershipUtils = entityOwnershipUtils;
95         this.portNameCache = portNameCache;
96         this.interfaceChildCache = interfaceChildCache;
97         this.ifmConfig = ifmConfigObj;
98         this.metricProvider = metricProvider;
99         registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
100         portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG);
101     }
102
103     @Override
104     public InstanceIdentifier<Node> getWildCardPath() {
105         return InstanceIdentifier.create(Nodes.class).child(Node.class);
106     }
107
108     @Override
109     protected NodeConnectorStatsImpl getDataTreeChangeListener() {
110         return NodeConnectorStatsImpl.this;
111     }
112
113     @Override
114     @PreDestroy
115     public void close() {
116         // close the nested counter objects for each node
117         metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
118     }
119
120     /*
121      * PortStat request task is started when first DPN gets connected
122      */
123     private void schedulePortStatRequestTask() {
124         if (!ifmConfig.isIfmStatsPollEnabled()) {
125             LOG.info("Port statistics is turned off");
126             return;
127         }
128         LOG.info("Scheduling port statistics request");
129         PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
130         scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask,
131                 ifmConfig.getIfmStatsDefPollInterval(), ifmConfig.getIfmStatsDefPollInterval(), TimeUnit.MINUTES);
132     }
133
134     /*
135      * PortStat request task is stopped when last DPN is removed.
136      */
137     private void stopPortStatRequestTask() {
138         if (scheduledResult != null) {
139             LOG.info("Stopping port statistics request");
140             scheduledResult.cancel(true);
141         }
142     }
143
144     /*
145      * This task queries for node connector statistics as well as flowtables
146      * statistics every 10 secs. Minimum period which can be configured for
147      * PMJob is 10 secs.
148      */
149     private class PortStatRequestTask implements Runnable {
150
151         @Override
152         public void run() {
153             if (LOG.isTraceEnabled()) {
154                 LOG.trace("Requesting port stats - {}");
155             }
156             for (String node : nodes) {
157                 LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
158                 // Call RPC to Get NodeConnector Stats for node
159                 ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture =
160                         opendaylightDirectStatisticsService.getNodeConnectorStatistics(
161                             buildGetNodeConnectorStatisticsInput(node));
162
163                 Futures.addCallback(ncStatsFuture, new FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
164
165                     @Override
166                     public void onFailure(@Nonnull Throwable error) {
167                         LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error);
168                     }
169
170                     @Override
171                     public void onSuccess(RpcResult<GetNodeConnectorStatisticsOutput> result) {
172                         if (result != null) {
173                             if (result.isSuccessful()) {
174                                 GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult();
175                                 // process NodeConnectorStatistics RPC result
176                                 processNodeConnectorStatistics(ncStatsRpcResult, node);
177                             } else {
178                                 LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}",
179                                         node, result.getErrors());
180                             }
181                         }
182                     }
183                 }, MoreExecutors.directExecutor());
184
185                 // Call RPC to Get flow stats for node
186                 ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture =
187                         opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node));
188
189                 Futures.addCallback(flowStatsFuture, new FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
190
191                     @Override
192                     public void onFailure(@Nonnull Throwable error) {
193                         LOG.error("getFlowStatistics RPC failed for node: {} ", node, error);
194                     }
195
196                     @Override
197                     public void onSuccess(RpcResult<GetFlowStatisticsOutput> result) {
198                         if (result != null) {
199                             if (result.isSuccessful()) {
200                                 GetFlowStatisticsOutput flowStatsRpcResult = result.getResult();
201                                 // process FlowStatistics RPC result
202                                 processFlowStatistics(flowStatsRpcResult, node);
203                             } else {
204                                 LOG.error("getFlowStatistics RPC failed for node: {} with error: {}",
205                                         node, result.getErrors());
206                             }
207                         }
208                     }
209                 }, MoreExecutors.directExecutor());
210
211                 delay();
212             }
213         }
214
215         /**
216          * The delay is added to spread the RPC call of the switches to query statistics
217          * across the polling interval.
218          * delay factor is calculated by dividing pollinginterval by no.of.switches.
219          */
220         private void delay() {
221             try {
222                 Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery));
223             } catch (InterruptedException ex) {
224                 LOG.error("InterruptedException");
225             }
226         }
227
228         /**
229          * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
230          */
231         private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) {
232             return new GetNodeConnectorStatisticsInputBuilder()
233                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
234                             .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
235                     .build();
236         }
237
238         /**
239          * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
240          */
241         private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) {
242             return new GetFlowStatisticsInputBuilder()
243                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
244                             .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
245                     .build();
246         }
247     }
248
249     /**
250      * This method processes NodeConnectorStatistics RPC result.
251      * It performs:
252      * - fetches various OF Port counters values
253      * - creates/updates new OF Port counters using Infrautils metrics API
254      * - set counter with values fetched from NodeConnectorStatistics
255      */
256     private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
257                                                 String dpid) {
258         String port = "";
259         String portUuid = "";
260         List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = nodeConnectorStatisticsOutput
261                         .getNodeConnectorStatisticsAndPortNumberMap();
262         // Parse NodeConnectorStatistics and create/update counters for them
263         for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : nullToEmpty(ncStatsAndPortMapList)) {
264             NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
265             LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid);
266             port = nodeConnector.getValue();
267             // update port name as per port name maintained in portNameCache
268             String portNameInCache = "openflow" + ":" + dpid + ":" + port;
269             java.util.Optional<String> portName = portNameCache.get(portNameInCache);
270             if (portName.isPresent()) {
271                 Optional<List<InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
272                         .getInterfaceChildEntries(portName.get());
273                 if (interfaceChildEntries.isPresent()) {
274                     if (!interfaceChildEntries.get().isEmpty()) {
275                         portUuid = interfaceChildEntries.get().get(0).getChildInterface();
276                         LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get());
277                     } else {
278                         LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
279                             portName.get());
280                         continue;
281                     }
282                 } else {
283                     LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
284                         portName.get());
285                     continue;
286                 }
287             }
288
289             Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null);
290             long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue();
291             updateCounter(counter, ofPortDuration);
292
293             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null);
294             long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue();
295             updateCounter(counter, packetsPerOFPortReceiveDrop);
296
297             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null);
298             long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue();
299             updateCounter(counter, packetsPerOFPortReceiveError);
300
301             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null);
302             long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue();
303             updateCounter(counter, packetsPerOFPortSent);
304
305             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null);
306             long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue();
307             updateCounter(counter, packetsPerOFPortReceive);
308
309             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null);
310             long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue();
311             updateCounter(counter, bytesPerOFPortSent);
312
313             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null);
314             long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue();
315             updateCounter(counter, bytesPerOFPortReceive);
316         }
317     }
318
319     /**
320      * This method processes FlowStatistics RPC result.
321      * It performs:
322      * - fetches all flows of node
323      * - stores flows count per table in local map
324      * - creates/updates Flow table counters using Infrautils metrics API
325      * - set counter with values fetched from FlowStatistics
326      */
327     private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) {
328         Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
329         // Get all flows for node from RPC result
330         List<FlowAndStatisticsMapList> flowTableAndStatisticsMapList = flowStatsOutput.getFlowAndStatisticsMapList();
331         for (FlowAndStatisticsMapList flowAndStatisticsMap : nullToEmpty(flowTableAndStatisticsMapList)) {
332             short tableId = flowAndStatisticsMap.getTableId();
333             // populate map to maintain flow count per table
334             flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
335         }
336         LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid);
337         for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
338             Short tableId = flowTable.getKey();
339             AtomicInteger flowCount = flowTable.getValue();
340             Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null,
341                     tableId.toString());
342             // update counter value
343             updateCounter(counter, flowCount.longValue());
344         }
345     }
346
347     /*
348      * This method returns counter and also creates counter if does not exist.
349      *
350      * @param counterName name of the counter
351      * @param switchId datapath-id value
352      * @param port port-id value
353      * @param aliasId alias-id value
354      * @param tableId table-id value of switch
355      * @return counter object
356      */
357     private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) {
358         /*
359          * Pattern to be followed for key generation:
360          *
361          * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value,
362          * name=counterName}
363          * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName}
364          */
365         Counter counter = null;
366         if (port != null) {
367             Labeled<Labeled<Labeled<Labeled<Labeled<Counter>>>>> labeledCounter =
368                     metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
369                         .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
370                         CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
371                         CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
372                         CounterConstants.LBL_KEY_COUNTER_NAME);
373             counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId)
374                     .label(port).label(aliasId).label(counterName);
375         }
376         if (tableId != null) {
377             Labeled<Labeled<Labeled<Labeled<Counter>>>> labeledCounter =
378                     metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
379                         .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
380                         CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
381                         CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
382             counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId)
383                     .label(tableId).label(counterName);
384         }
385
386         // create counters set for node if absent.
387         // and then populate counter set with counter object
388         // which will be needed to close counters when node is removed.
389         metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter);
390
391         return counter;
392     }
393
394     /**
395      * This method updates counter values.
396      */
397     private void updateCounter(Counter counter, long counterValue) {
398         try {
399             // reset counter to zero
400             counter.decrement(counter.get());
401             // set counter to specified value
402             counter.increment(counterValue);
403         } catch (IllegalStateException e) {
404             LOG.error("Metric counter ({}) update has got exception: ", counter, e);
405         }
406     }
407
408     @Override
409     protected void remove(InstanceIdentifier<Node> identifier, Node node) {
410         NodeId nodeId = node.getId();
411         String dpId = nodeId.getValue().split(":")[1];
412         if (nodes.contains(dpId)) {
413             nodes.remove(dpId);
414             // remove counters set from node
415             Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
416             if (nodeMetricCounterSet != null) {
417                 // remove counters
418                 nodeMetricCounterSet.forEach(UncheckedCloseable::close);
419             }
420         }
421         if (nodes.size() > 0) {
422             delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
423         } else {
424             stopPortStatRequestTask();
425             delayStatsQuery = 0;
426         }
427     }
428
429     @Override
430     protected void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
431         // TODO Auto-generated method stub
432     }
433
434     @Override
435     protected void add(InstanceIdentifier<Node> identifier, Node node) {
436         NodeId nodeId = node.getId();
437         if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
438             LOG.trace("Locally connected switch {}",nodeId.getValue());
439             String dpId = nodeId.getValue().split(":")[1];
440             if (nodes.contains(dpId)) {
441                 return;
442             }
443             nodes.add(dpId);
444             delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
445             if (nodes.size() == 1) {
446                 schedulePortStatRequestTask();
447             }
448         } else {
449             LOG.trace("Not a locally connected switch {}",nodeId.getValue());
450         }
451     }
452 }