0007c725ef23a3a9f9a2f159d77a693962f61eb3
[genius.git] / interfacemanager / interfacemanager-impl / src / main / java / org / opendaylight / genius / interfacemanager / pmcounters / NodeConnectorStatsImpl.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.interfacemanager.pmcounters;
9
10 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool;
11
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.HashMap;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Optional;
20 import java.util.Set;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import javax.annotation.PreDestroy;
27 import javax.inject.Inject;
28 import javax.inject.Singleton;
29 import org.apache.aries.blueprint.annotation.service.Reference;
30 import org.opendaylight.genius.interfacemanager.IfmConstants;
31 import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache;
32 import org.opendaylight.genius.interfacemanager.listeners.PortNameCache;
33 import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
34 import org.opendaylight.infrautils.metrics.Counter;
35 import org.opendaylight.infrautils.metrics.Labeled;
36 import org.opendaylight.infrautils.metrics.MetricDescriptor;
37 import org.opendaylight.infrautils.metrics.MetricProvider;
38 import org.opendaylight.infrautils.utils.UncheckedCloseable;
39 import org.opendaylight.infrautils.utils.concurrent.Executors;
40 import org.opendaylight.mdsal.binding.api.DataBroker;
41 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
42 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.opendaylight.yangtools.yang.common.RpcResult;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 @Singleton
66 public class NodeConnectorStatsImpl extends AbstractClusteredAsyncDataTreeChangeListener<Node> {
67
68     private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
69
70     private static final int THREAD_POOL_SIZE = 4;
71     private final Set<String> nodes = ConcurrentHashMap.newKeySet();
72     private final Map<String, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
73     private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
74     private final ScheduledExecutorService portStatExecutorService;
75     private final EntityOwnershipUtils entityOwnershipUtils;
76     private final PortNameCache portNameCache;
77     private final InterfaceChildCache interfaceChildCache;
78     private final IfmConfig ifmConfig;
79     private final MetricProvider metricProvider;
80
81     private volatile int delayStatsQuery;
82     private ScheduledFuture<?> scheduledResult;
83
84     @Inject
85     public NodeConnectorStatsImpl(@Reference DataBroker dataBroker,
86                                   final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService,
87                                   final EntityOwnershipUtils entityOwnershipUtils,
88                                   final PortNameCache portNameCache,
89                                   final InterfaceChildCache interfaceChildCache,
90                                   final IfmConfig ifmConfigObj,
91                                   final @Reference  MetricProvider metricProvider) {
92         super(dataBroker, LogicalDatastoreType.OPERATIONAL,
93                 InstanceIdentifier.create(Nodes.class).child(Node.class),
94                 Executors.newSingleThreadExecutor("NodeConnectorStatsImpl", LOG));
95         this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService;
96         this.entityOwnershipUtils = entityOwnershipUtils;
97         this.portNameCache = portNameCache;
98         this.interfaceChildCache = interfaceChildCache;
99         this.ifmConfig = ifmConfigObj;
100         this.metricProvider = metricProvider;
101         portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG);
102     }
103
104     @Override
105     @PreDestroy
106     public void close() {
107         // close the nested counter objects for each node
108         metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
109     }
110
111     /*
112      * PortStat request task is started when first DPN gets connected
113      */
114     private void schedulePortStatRequestTask() {
115         if (!ifmConfig.isIfmStatsPollEnabled()) {
116             LOG.info("Port statistics is turned off");
117             return;
118         }
119         LOG.info("Scheduling port statistics request");
120         PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
121         scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask,
122                 ifmConfig.getIfmStatsDefPollInterval().toJava(), ifmConfig.getIfmStatsDefPollInterval().toJava(),
123                 TimeUnit.MINUTES);
124     }
125
126     /*
127      * PortStat request task is stopped when last DPN is removed.
128      */
129     private void stopPortStatRequestTask() {
130         if (scheduledResult != null) {
131             LOG.info("Stopping port statistics request");
132             scheduledResult.cancel(true);
133         }
134     }
135
136     /*
137      * This task queries for node connector statistics as well as flowtables
138      * statistics every 10 secs. Minimum period which can be configured for
139      * PMJob is 10 secs.
140      */
141     private class PortStatRequestTask implements Runnable {
142
143         @Override
144         public void run() {
145             if (LOG.isTraceEnabled()) {
146                 LOG.trace("Requesting port stats");
147             }
148             for (String node : nodes) {
149                 LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
150                 // Call RPC to Get NodeConnector Stats for node
151                 ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture =
152                         opendaylightDirectStatisticsService.getNodeConnectorStatistics(
153                             buildGetNodeConnectorStatisticsInput(node));
154
155                 Futures.addCallback(ncStatsFuture, new FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
156
157                     @Override
158                     public void onFailure(Throwable error) {
159                         LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error);
160                     }
161
162                     @Override
163                     public void onSuccess(RpcResult<GetNodeConnectorStatisticsOutput> result) {
164                         if (result != null) {
165                             if (result.isSuccessful()) {
166                                 GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult();
167                                 // process NodeConnectorStatistics RPC result
168                                 processNodeConnectorStatistics(ncStatsRpcResult, node);
169                             } else {
170                                 LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}",
171                                         node, result.getErrors());
172                             }
173                         }
174                     }
175                 }, portStatExecutorService);
176
177                 // Call RPC to Get flow stats for node
178                 ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture =
179                         opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node));
180
181                 Futures.addCallback(flowStatsFuture, new FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
182
183                     @Override
184                     public void onFailure(Throwable error) {
185                         LOG.error("getFlowStatistics RPC failed for node: {} ", node, error);
186                     }
187
188                     @Override
189                     public void onSuccess(RpcResult<GetFlowStatisticsOutput> result) {
190                         if (result != null) {
191                             if (result.isSuccessful()) {
192                                 GetFlowStatisticsOutput flowStatsRpcResult = result.getResult();
193                                 // process FlowStatistics RPC result
194                                 processFlowStatistics(flowStatsRpcResult, node);
195                             } else {
196                                 LOG.error("getFlowStatistics RPC failed for node: {} with error: {}",
197                                         node, result.getErrors());
198                             }
199                         }
200                     }
201                 }, portStatExecutorService);
202
203                 delay();
204             }
205         }
206
207         /**
208          * The delay is added to spread the RPC call of the switches to query statistics
209          * across the polling interval.
210          * delay factor is calculated by dividing pollinginterval by no.of.switches.
211          */
212         private void delay() {
213             try {
214                 Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery));
215             } catch (InterruptedException ex) {
216                 LOG.error("InterruptedException");
217             }
218         }
219
220         /**
221          * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
222          */
223         private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) {
224             return new GetNodeConnectorStatisticsInputBuilder()
225                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
226                             .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
227                     .build();
228         }
229
230         /**
231          * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
232          */
233         private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) {
234             return new GetFlowStatisticsInputBuilder()
235                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
236                             .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
237                     .build();
238         }
239     }
240
241     /**
242      * This method processes NodeConnectorStatistics RPC result.
243      * It performs:
244      * - fetches various OF Port counters values
245      * - creates/updates new OF Port counters using Infrautils metrics API
246      * - set counter with values fetched from NodeConnectorStatistics
247      */
248     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
249             justification = "https://github.com/spotbugs/spotbugs/issues/811")
250     private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
251                                                 String dpid) {
252         String port = "";
253         String portUuid = "";
254         List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = nodeConnectorStatisticsOutput
255                         .nonnullNodeConnectorStatisticsAndPortNumberMap();
256         // Parse NodeConnectorStatistics and create/update counters for them
257         for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
258             NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
259             LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid);
260             port = nodeConnector.getValue();
261             // update port name as per port name maintained in portNameCache
262             String portNameInCache = "openflow" + ":" + dpid + ":" + port;
263             java.util.Optional<String> portName = portNameCache.get(portNameInCache);
264             if (portName.isPresent()) {
265                 Optional<List<InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
266                         .getInterfaceChildEntries(portName.get());
267                 if (interfaceChildEntries.isPresent()) {
268                     if (!interfaceChildEntries.get().isEmpty()) {
269                         portUuid = interfaceChildEntries.get().get(0).getChildInterface();
270                         LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get());
271                     } else {
272                         LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
273                             portName.get());
274                         continue;
275                     }
276                 } else {
277                     LOG.trace("PortUuid is not present for portname {}. Skipping IFM counters publish for this port.",
278                         portName.get());
279                     continue;
280                 }
281             } else {
282                 LOG.trace("Port {} not found in PortName Cache.", portNameInCache);
283                 continue;
284             }
285
286             Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null);
287             long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue().toJava();
288             updateCounter(counter, ofPortDuration);
289
290             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null);
291             long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue();
292             updateCounter(counter, packetsPerOFPortReceiveDrop);
293
294             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null);
295             long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue();
296             updateCounter(counter, packetsPerOFPortReceiveError);
297
298             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null);
299             long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue();
300             updateCounter(counter, packetsPerOFPortSent);
301
302             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null);
303             long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue();
304             updateCounter(counter, packetsPerOFPortReceive);
305
306             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null);
307             long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue();
308             updateCounter(counter, bytesPerOFPortSent);
309
310             counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null);
311             long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue();
312             updateCounter(counter, bytesPerOFPortReceive);
313         }
314     }
315
316     /**
317      * This method processes FlowStatistics RPC result.
318      * It performs:
319      * - fetches all flows of node
320      * - stores flows count per table in local map
321      * - creates/updates Flow table counters using Infrautils metrics API
322      * - set counter with values fetched from FlowStatistics
323      */
324     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
325             justification = "https://github.com/spotbugs/spotbugs/issues/811")
326     private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) {
327         Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
328         // Get all flows for node from RPC result
329         List<FlowAndStatisticsMapList> flowTableAndStatisticsMapList =
330             flowStatsOutput.nonnullFlowAndStatisticsMapList();
331         for (FlowAndStatisticsMapList flowAndStatisticsMap : flowTableAndStatisticsMapList) {
332             short tableId = flowAndStatisticsMap.getTableId().toJava();
333             // populate map to maintain flow count per table
334             flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
335         }
336         LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid);
337         for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
338             Short tableId = flowTable.getKey();
339             AtomicInteger flowCount = flowTable.getValue();
340             Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null,
341                     tableId.toString());
342             // update counter value
343             updateCounter(counter, flowCount.longValue());
344         }
345     }
346
347     /*
348      * This method returns counter and also creates counter if does not exist.
349      *
350      * @param counterName name of the counter
351      * @param switchId datapath-id value
352      * @param port port-id value
353      * @param aliasId alias-id value
354      * @param tableId table-id value of switch
355      * @return counter object
356      */
357     private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) {
358         /*
359          * Pattern to be followed for key generation:
360          *
361          * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value,
362          * name=counterName}
363          * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName}
364          */
365         Counter counter = null;
366         if (port != null) {
367             Labeled<Labeled<Labeled<Labeled<Labeled<Counter>>>>> labeledCounter =
368                     metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
369                         .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
370                         CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
371                         CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
372                         CounterConstants.LBL_KEY_COUNTER_NAME);
373             counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId)
374                     .label(port).label(aliasId).label(counterName);
375         }
376         if (tableId != null) {
377             Labeled<Labeled<Labeled<Labeled<Counter>>>> labeledCounter =
378                     metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
379                         .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
380                         CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
381                         CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
382             counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId)
383                     .label(tableId).label(counterName);
384         }
385
386         // create counters set for node if absent.
387         // and then populate counter set with counter object
388         // which will be needed to close counters when node is removed.
389         metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter);
390
391         return counter;
392     }
393
394     /**
395      * This method updates counter values.
396      */
397     private void updateCounter(Counter counter, long counterValue) {
398         try {
399             // reset counter to zero
400             counter.decrement(counter.get());
401             // set counter to specified value
402             counter.increment(counterValue);
403         } catch (IllegalStateException e) {
404             LOG.error("Metric counter ({}) update has got exception: ", counter, e);
405         }
406     }
407
408     @Override
409     public void remove(InstanceIdentifier<Node> identifier, Node node) {
410         NodeId nodeId = node.getId();
411         String dpId = nodeId.getValue().split(":")[1];
412         if (nodes.contains(dpId)) {
413             nodes.remove(dpId);
414             // remove counters set from node
415             Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
416             if (nodeMetricCounterSet != null) {
417                 // remove counters
418                 nodeMetricCounterSet.forEach(UncheckedCloseable::close);
419             }
420         }
421         if (nodes.size() > 0) {
422             delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval().toJava() / nodes.size();
423         } else {
424             stopPortStatRequestTask();
425             delayStatsQuery = 0;
426         }
427     }
428
429     @Override
430     public void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
431         // TODO Auto-generated method stub
432     }
433
434     @Override
435     public void add(InstanceIdentifier<Node> identifier, Node node) {
436         NodeId nodeId = node.getId();
437         if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
438             LOG.trace("Locally connected switch {}",nodeId.getValue());
439             String dpId = nodeId.getValue().split(":")[1];
440             if (nodes.contains(dpId)) {
441                 return;
442             }
443             nodes.add(dpId);
444             delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval().toJava() / nodes.size();
445             if (nodes.size() == 1) {
446                 schedulePortStatRequestTask();
447             }
448         } else {
449             LOG.trace("Not a locally connected switch {}",nodeId.getValue());
450         }
451     }
452 }