97e740c5694019b6c930f899fdb3bc6274298d25
[genius.git] / interfacemanager / interfacemanager-impl / src / main / java / org / opendaylight / genius / interfacemanager / pmcounters / NodeConnectorStatsImpl.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.interfacemanager.pmcounters;
9
10 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool;
11
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.ScheduledFuture;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import javax.annotation.PreDestroy;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.apache.aries.blueprint.annotation.service.Reference;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.opendaylight.genius.interfacemanager.IfmConstants;
31 import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache;
32 import org.opendaylight.genius.interfacemanager.listeners.PortNameCache;
33 import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
34 import org.opendaylight.infrautils.metrics.Counter;
35 import org.opendaylight.infrautils.metrics.Labeled;
36 import org.opendaylight.infrautils.metrics.MetricDescriptor;
37 import org.opendaylight.infrautils.metrics.MetricProvider;
38 import org.opendaylight.infrautils.utils.UncheckedCloseable;
39 import org.opendaylight.infrautils.utils.concurrent.Executors;
40 import org.opendaylight.mdsal.binding.api.DataBroker;
41 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
42 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntryKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMapKey;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 @Singleton
69 public class NodeConnectorStatsImpl extends AbstractClusteredAsyncDataTreeChangeListener<Node> {
70
71     private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
72
73     private static final int THREAD_POOL_SIZE = 4;
74     private final Set<String> nodes = ConcurrentHashMap.newKeySet();
75     private final Map<String, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
76     private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
77     private final ScheduledExecutorService portStatExecutorService;
78     private final EntityOwnershipUtils entityOwnershipUtils;
79     private final PortNameCache portNameCache;
80     private final InterfaceChildCache interfaceChildCache;
81     private final IfmConfig ifmConfig;
82     private final MetricProvider metricProvider;
83
84     private volatile int delayStatsQuery;
85     private ScheduledFuture<?> scheduledResult;
86
87     @Inject
88     public NodeConnectorStatsImpl(@Reference DataBroker dataBroker,
89                                   final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService,
90                                   final EntityOwnershipUtils entityOwnershipUtils,
91                                   final PortNameCache portNameCache,
92                                   final InterfaceChildCache interfaceChildCache,
93                                   final IfmConfig ifmConfigObj,
94                                   final @Reference  MetricProvider metricProvider) {
95         super(dataBroker, LogicalDatastoreType.OPERATIONAL,
96                 InstanceIdentifier.create(Nodes.class).child(Node.class),
97                 Executors.newSingleThreadExecutor("NodeConnectorStatsImpl", LOG));
98         this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService;
99         this.entityOwnershipUtils = entityOwnershipUtils;
100         this.portNameCache = portNameCache;
101         this.interfaceChildCache = interfaceChildCache;
102         this.ifmConfig = ifmConfigObj;
103         this.metricProvider = metricProvider;
104         portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG);
105     }
106
107     @Override
108     @PreDestroy
109     public void close() {
110         // close the nested counter objects for each node
111         metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
112     }
113
114     /*
115      * PortStat request task is started when first DPN gets connected
116      */
117     private void schedulePortStatRequestTask() {
118         if (!ifmConfig.isIfmStatsPollEnabled()) {
119             LOG.info("Port statistics is turned off");
120             return;
121         }
122         LOG.info("Scheduling port statistics request");
123         PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
124         scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask,
125                 ifmConfig.getIfmStatsDefPollInterval().toJava(), ifmConfig.getIfmStatsDefPollInterval().toJava(),
126                 TimeUnit.MINUTES);
127     }
128
129     /*
130      * PortStat request task is stopped when last DPN is removed.
131      */
132     private void stopPortStatRequestTask() {
133         if (scheduledResult != null) {
134             LOG.info("Stopping port statistics request");
135             scheduledResult.cancel(true);
136         }
137     }
138
139     /*
140      * This task queries for node connector statistics as well as flowtables
141      * statistics every 10 secs. Minimum period which can be configured for
142      * PMJob is 10 secs.
143      */
144     private class PortStatRequestTask implements Runnable {
145
146         @Override
147         public void run() {
148             if (LOG.isTraceEnabled()) {
149                 LOG.trace("Requesting port stats");
150             }
151             for (String node : nodes) {
152                 LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
153                 // Call RPC to Get NodeConnector Stats for node
154                 ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture =
155                         opendaylightDirectStatisticsService.getNodeConnectorStatistics(
156                             buildGetNodeConnectorStatisticsInput(node));
157
158                 Futures.addCallback(ncStatsFuture, new FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
159
160                     @Override
161                     public void onFailure(Throwable error) {
162                         LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error);
163                     }
164
165                     @Override
166                     public void onSuccess(RpcResult<GetNodeConnectorStatisticsOutput> result) {
167                         if (result != null) {
168                             if (result.isSuccessful()) {
169                                 GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult();
170                                 // process NodeConnectorStatistics RPC result
171                                 processNodeConnectorStatistics(ncStatsRpcResult, node);
172                             } else {
173                                 LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}",
174                                         node, result.getErrors());
175                             }
176                         }
177                     }
178                 }, portStatExecutorService);
179
180                 // Call RPC to Get flow stats for node
181                 ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture =
182                         opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node));
183
184                 Futures.addCallback(flowStatsFuture, new FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
185
186                     @Override
187                     public void onFailure(Throwable error) {
188                         LOG.error("getFlowStatistics RPC failed for node: {} ", node, error);
189                     }
190
191                     @Override
192                     public void onSuccess(RpcResult<GetFlowStatisticsOutput> result) {
193                         if (result != null) {
194                             if (result.isSuccessful()) {
195                                 GetFlowStatisticsOutput flowStatsRpcResult = result.getResult();
196                                 // process FlowStatistics RPC result
197                                 processFlowStatistics(flowStatsRpcResult, node);
198                             } else {
199                                 LOG.error("getFlowStatistics RPC failed for node: {} with error: {}",
200                                         node, result.getErrors());
201                             }
202                         }
203                     }
204                 }, portStatExecutorService);
205
206                 delay();
207             }
208         }
209
210         /**
211          * The delay is added to spread the RPC call of the switches to query statistics
212          * across the polling interval.
213          * delay factor is calculated by dividing pollinginterval by no.of.switches.
214          */
215         private void delay() {
216             try {
217                 Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery));
218             } catch (InterruptedException ex) {
219                 LOG.error("InterruptedException");
220             }
221         }
222
223         /**
224          * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
225          */
226         private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) {
227             return new GetNodeConnectorStatisticsInputBuilder()
228                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
229                             .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
230                     .build();
231         }
232
233         /**
234          * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
235          */
236         private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) {
237             return new GetFlowStatisticsInputBuilder()
238                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
239                             .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
240                     .build();
241         }
242     }
243
244     /**
245      * This method processes NodeConnectorStatistics RPC result.
246      * It performs:
247      * - fetches various OF Port counters values
248      * - creates/updates new OF Port counters using Infrautils metrics API
249      * - set counter with values fetched from NodeConnectorStatistics
250      */
251     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
252             justification = "https://github.com/spotbugs/spotbugs/issues/811")
253     private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
254                                                 String dpid) {
255         String port = "";
256         String portUuid = "";
257         @NonNull Map<NodeConnectorStatisticsAndPortNumberMapKey, NodeConnectorStatisticsAndPortNumberMap>
258                 ncStatsAndPortMapList = nodeConnectorStatisticsOutput.nonnullNodeConnectorStatisticsAndPortNumberMap();
259         // Parse NodeConnectorStatistics and create/update counters for them
260         for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList.values()) {
261             NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
262             LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid);
263             port = nodeConnector.getValue();
264             // update port name as per port name maintained in portNameCache
265             String portNameInCache = "openflow" + ":" + dpid + ":" + port;
266             java.util.Optional<String> portName = portNameCache.get(portNameInCache);
267             if (portName.isPresent()) {
268                 Optional<Map<InterfaceChildEntryKey, InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
269                         .getInterfaceChildEntries(portName.get());
270                 if (interfaceChildEntries.isPresent()) {
271                     if (!interfaceChildEntries.get().isEmpty()) {
272                         portUuid = interfaceChildEntries.get().get(0).getChildInterface();
273                         LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get());
274                     } else {
275                         LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
276                             portName.get());
277                         continue;
278                     }
279                 } else {
280                     LOG.trace("PortUuid is not present for portname {}. Skipping IFM counters publish for this port.",
281                         portName.get());
282                     continue;
283                 }
284             } else {
285                 LOG.trace("Port {} not found in PortName Cache.", portNameInCache);
286                 continue;
287             }
288
289             Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null);
290             long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue().toJava();
291             updateCounter(counter, ofPortDuration);
292
293             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null);
294             long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue();
295             updateCounter(counter, packetsPerOFPortReceiveDrop);
296
297             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null);
298             long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue();
299             updateCounter(counter, packetsPerOFPortReceiveError);
300
301             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null);
302             long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue();
303             updateCounter(counter, packetsPerOFPortSent);
304
305             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null);
306             long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue();
307             updateCounter(counter, packetsPerOFPortReceive);
308
309             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null);
310             long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue();
311             updateCounter(counter, bytesPerOFPortSent);
312
313             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null);
314             long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue();
315             updateCounter(counter, bytesPerOFPortReceive);
316         }
317     }
318
319     /**
320      * This method processes FlowStatistics RPC result.
321      * It performs:
322      * - fetches all flows of node
323      * - stores flows count per table in local map
324      * - creates/updates Flow table counters using Infrautils metrics API
325      * - set counter with values fetched from FlowStatistics
326      */
327     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
328             justification = "https://github.com/spotbugs/spotbugs/issues/811")
329     private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) {
330         Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
331         // Get all flows for node from RPC result
332         @NonNull Map<FlowAndStatisticsMapListKey, FlowAndStatisticsMapList> flowTableAndStatisticsMapList =
333             flowStatsOutput.nonnullFlowAndStatisticsMapList();
334         for (FlowAndStatisticsMapList flowAndStatisticsMap : flowTableAndStatisticsMapList.values()) {
335             short tableId = flowAndStatisticsMap.getTableId().toJava();
336             // populate map to maintain flow count per table
337             flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
338         }
339         LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid);
340         for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
341             Short tableId = flowTable.getKey();
342             AtomicInteger flowCount = flowTable.getValue();
343             Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null,
344                     tableId.toString());
345             // update counter value
346             updateCounter(counter, flowCount.longValue());
347         }
348     }
349
350     /*
351      * This method returns counter and also creates counter if does not exist.
352      *
353      * @param counterName name of the counter
354      * @param switchId datapath-id value
355      * @param port port-id value
356      * @param aliasId alias-id value
357      * @param tableId table-id value of switch
358      * @return counter object
359      */
360     private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) {
361         /*
362          * Pattern to be followed for key generation:
363          *
364          * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value,
365          * name=counterName}
366          * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName}
367          */
368         Counter counter = null;
369         if (port != null) {
370             Labeled<Labeled<Labeled<Labeled<Labeled<Counter>>>>> labeledCounter =
371                     metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
372                         .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
373                         CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
374                         CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
375                         CounterConstants.LBL_KEY_COUNTER_NAME);
376             counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId)
377                     .label(port).label(aliasId).label(counterName);
378         }
379         if (tableId != null) {
380             Labeled<Labeled<Labeled<Labeled<Counter>>>> labeledCounter =
381                     metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
382                         .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
383                         CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
384                         CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
385             counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId)
386                     .label(tableId).label(counterName);
387         }
388
389         // create counters set for node if absent.
390         // and then populate counter set with counter object
391         // which will be needed to close counters when node is removed.
392         metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter);
393
394         return counter;
395     }
396
397     /**
398      * This method updates counter values.
399      */
400     private void updateCounter(Counter counter, long counterValue) {
401         try {
402             // reset counter to zero
403             counter.decrement(counter.get());
404             // set counter to specified value
405             counter.increment(counterValue);
406         } catch (IllegalStateException e) {
407             LOG.error("Metric counter ({}) update has got exception: ", counter, e);
408         }
409     }
410
411     @Override
412     public void remove(InstanceIdentifier<Node> identifier, Node node) {
413         NodeId nodeId = node.getId();
414         String dpId = nodeId.getValue().split(":")[1];
415         if (nodes.contains(dpId)) {
416             nodes.remove(dpId);
417             // remove counters set from node
418             Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
419             if (nodeMetricCounterSet != null) {
420                 // remove counters
421                 nodeMetricCounterSet.forEach(UncheckedCloseable::close);
422             }
423         }
424         if (nodes.size() > 0) {
425             delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval().toJava() / nodes.size();
426         } else {
427             stopPortStatRequestTask();
428             delayStatsQuery = 0;
429         }
430     }
431
432     @Override
433     public void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
434         // TODO Auto-generated method stub
435     }
436
437     @Override
438     public void add(InstanceIdentifier<Node> identifier, Node node) {
439         NodeId nodeId = node.getId();
440         if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
441             LOG.trace("Locally connected switch {}",nodeId.getValue());
442             String dpId = nodeId.getValue().split(":")[1];
443             if (nodes.contains(dpId)) {
444                 return;
445             }
446             nodes.add(dpId);
447             delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval().toJava() / nodes.size();
448             if (nodes.size() == 1) {
449                 schedulePortStatRequestTask();
450             }
451         } else {
452             LOG.trace("Not a locally connected switch {}",nodeId.getValue());
453         }
454     }
455 }