2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.interfacemanager.pmcounters;
10 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.HashMap;
17 import java.util.List;
19 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import javax.annotation.PreDestroy;
27 import javax.inject.Inject;
28 import javax.inject.Singleton;
29 import org.apache.aries.blueprint.annotation.service.Reference;
30 import org.opendaylight.genius.interfacemanager.IfmConstants;
31 import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache;
32 import org.opendaylight.genius.interfacemanager.listeners.PortNameCache;
33 import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
34 import org.opendaylight.infrautils.metrics.Counter;
35 import org.opendaylight.infrautils.metrics.Labeled;
36 import org.opendaylight.infrautils.metrics.MetricDescriptor;
37 import org.opendaylight.infrautils.metrics.MetricProvider;
38 import org.opendaylight.infrautils.utils.UncheckedCloseable;
39 import org.opendaylight.infrautils.utils.concurrent.Executors;
40 import org.opendaylight.mdsal.binding.api.DataBroker;
41 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
42 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.opendaylight.yangtools.yang.common.RpcResult;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
66 public class NodeConnectorStatsImpl extends AbstractClusteredAsyncDataTreeChangeListener<Node> {
68 private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
70 private static final int THREAD_POOL_SIZE = 4;
71 private final Set<String> nodes = ConcurrentHashMap.newKeySet();
72 private final Map<String, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
73 private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
74 private final ScheduledExecutorService portStatExecutorService;
75 private final EntityOwnershipUtils entityOwnershipUtils;
76 private final PortNameCache portNameCache;
77 private final InterfaceChildCache interfaceChildCache;
78 private final IfmConfig ifmConfig;
79 private final MetricProvider metricProvider;
81 private volatile int delayStatsQuery;
82 private ScheduledFuture<?> scheduledResult;
85 public NodeConnectorStatsImpl(@Reference DataBroker dataBroker,
86 final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService,
87 final EntityOwnershipUtils entityOwnershipUtils,
88 final PortNameCache portNameCache,
89 final InterfaceChildCache interfaceChildCache,
90 final IfmConfig ifmConfigObj,
91 final @Reference MetricProvider metricProvider) {
92 super(dataBroker, LogicalDatastoreType.OPERATIONAL,
93 InstanceIdentifier.create(Nodes.class).child(Node.class),
94 Executors.newSingleThreadExecutor("NodeConnectorStatsImpl", LOG));
95 this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService;
96 this.entityOwnershipUtils = entityOwnershipUtils;
97 this.portNameCache = portNameCache;
98 this.interfaceChildCache = interfaceChildCache;
99 this.ifmConfig = ifmConfigObj;
100 this.metricProvider = metricProvider;
101 portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG);
106 public void close() {
107 // close the nested counter objects for each node
108 metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
112 * PortStat request task is started when first DPN gets connected
114 private void schedulePortStatRequestTask() {
115 if (!ifmConfig.isIfmStatsPollEnabled()) {
116 LOG.info("Port statistics is turned off");
119 LOG.info("Scheduling port statistics request");
120 PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
121 scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask,
122 ifmConfig.getIfmStatsDefPollInterval().toJava(), ifmConfig.getIfmStatsDefPollInterval().toJava(),
127 * PortStat request task is stopped when last DPN is removed.
129 private void stopPortStatRequestTask() {
130 if (scheduledResult != null) {
131 LOG.info("Stopping port statistics request");
132 scheduledResult.cancel(true);
137 * This task queries for node connector statistics as well as flowtables
138 * statistics every 10 secs. Minimum period which can be configured for
141 private class PortStatRequestTask implements Runnable {
145 if (LOG.isTraceEnabled()) {
146 LOG.trace("Requesting port stats");
148 for (String node : nodes) {
149 LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
150 // Call RPC to Get NodeConnector Stats for node
151 ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture =
152 opendaylightDirectStatisticsService.getNodeConnectorStatistics(
153 buildGetNodeConnectorStatisticsInput(node));
155 Futures.addCallback(ncStatsFuture, new FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
158 public void onFailure(Throwable error) {
159 LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error);
163 public void onSuccess(RpcResult<GetNodeConnectorStatisticsOutput> result) {
164 if (result != null) {
165 if (result.isSuccessful()) {
166 GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult();
167 // process NodeConnectorStatistics RPC result
168 processNodeConnectorStatistics(ncStatsRpcResult, node);
170 LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}",
171 node, result.getErrors());
175 }, portStatExecutorService);
177 // Call RPC to Get flow stats for node
178 ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture =
179 opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node));
181 Futures.addCallback(flowStatsFuture, new FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
184 public void onFailure(Throwable error) {
185 LOG.error("getFlowStatistics RPC failed for node: {} ", node, error);
189 public void onSuccess(RpcResult<GetFlowStatisticsOutput> result) {
190 if (result != null) {
191 if (result.isSuccessful()) {
192 GetFlowStatisticsOutput flowStatsRpcResult = result.getResult();
193 // process FlowStatistics RPC result
194 processFlowStatistics(flowStatsRpcResult, node);
196 LOG.error("getFlowStatistics RPC failed for node: {} with error: {}",
197 node, result.getErrors());
201 }, portStatExecutorService);
208 * The delay is added to spread the RPC call of the switches to query statistics
209 * across the polling interval.
210 * delay factor is calculated by dividing pollinginterval by no.of.switches.
212 private void delay() {
214 Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery));
215 } catch (InterruptedException ex) {
216 LOG.error("InterruptedException");
221 * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
223 private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) {
224 return new GetNodeConnectorStatisticsInputBuilder()
225 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
226 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
231 * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
233 private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) {
234 return new GetFlowStatisticsInputBuilder()
235 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
236 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
242 * This method processes NodeConnectorStatistics RPC result.
244 * - fetches various OF Port counters values
245 * - creates/updates new OF Port counters using Infrautils metrics API
246 * - set counter with values fetched from NodeConnectorStatistics
248 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
249 justification = "https://github.com/spotbugs/spotbugs/issues/811")
250 private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
253 String portUuid = "";
254 List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = nodeConnectorStatisticsOutput
255 .nonnullNodeConnectorStatisticsAndPortNumberMap();
256 // Parse NodeConnectorStatistics and create/update counters for them
257 for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
258 NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
259 LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid);
260 port = nodeConnector.getValue();
261 // update port name as per port name maintained in portNameCache
262 String portNameInCache = "openflow" + ":" + dpid + ":" + port;
263 java.util.Optional<String> portName = portNameCache.get(portNameInCache);
264 if (portName.isPresent()) {
265 Optional<List<InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
266 .getInterfaceChildEntries(portName.get());
267 if (interfaceChildEntries.isPresent()) {
268 if (!interfaceChildEntries.get().isEmpty()) {
269 portUuid = interfaceChildEntries.get().get(0).getChildInterface();
270 LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get());
272 LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
277 LOG.trace("PortUuid is not present for portname {}. Skipping IFM counters publish for this port.",
282 LOG.trace("Port {} not found in PortName Cache.", portNameInCache);
286 Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null);
287 long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue().toJava();
288 updateCounter(counter, ofPortDuration);
290 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null);
291 long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue();
292 updateCounter(counter, packetsPerOFPortReceiveDrop);
294 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null);
295 long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue();
296 updateCounter(counter, packetsPerOFPortReceiveError);
298 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null);
299 long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue();
300 updateCounter(counter, packetsPerOFPortSent);
302 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null);
303 long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue();
304 updateCounter(counter, packetsPerOFPortReceive);
306 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null);
307 long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue();
308 updateCounter(counter, bytesPerOFPortSent);
310 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null);
311 long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue();
312 updateCounter(counter, bytesPerOFPortReceive);
317 * This method processes FlowStatistics RPC result.
319 * - fetches all flows of node
320 * - stores flows count per table in local map
321 * - creates/updates Flow table counters using Infrautils metrics API
322 * - set counter with values fetched from FlowStatistics
324 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
325 justification = "https://github.com/spotbugs/spotbugs/issues/811")
326 private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) {
327 Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
328 // Get all flows for node from RPC result
329 List<FlowAndStatisticsMapList> flowTableAndStatisticsMapList =
330 flowStatsOutput.nonnullFlowAndStatisticsMapList();
331 for (FlowAndStatisticsMapList flowAndStatisticsMap : flowTableAndStatisticsMapList) {
332 short tableId = flowAndStatisticsMap.getTableId().toJava();
333 // populate map to maintain flow count per table
334 flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
336 LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid);
337 for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
338 Short tableId = flowTable.getKey();
339 AtomicInteger flowCount = flowTable.getValue();
340 Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null,
342 // update counter value
343 updateCounter(counter, flowCount.longValue());
348 * This method returns counter and also creates counter if does not exist.
350 * @param counterName name of the counter
351 * @param switchId datapath-id value
352 * @param port port-id value
353 * @param aliasId alias-id value
354 * @param tableId table-id value of switch
355 * @return counter object
357 private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) {
359 * Pattern to be followed for key generation:
361 * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value,
363 * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName}
365 Counter counter = null;
367 Labeled<Labeled<Labeled<Labeled<Labeled<Counter>>>>> labeledCounter =
368 metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
369 .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
370 CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
371 CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
372 CounterConstants.LBL_KEY_COUNTER_NAME);
373 counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId)
374 .label(port).label(aliasId).label(counterName);
376 if (tableId != null) {
377 Labeled<Labeled<Labeled<Labeled<Counter>>>> labeledCounter =
378 metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
379 .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
380 CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
381 CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
382 counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId)
383 .label(tableId).label(counterName);
386 // create counters set for node if absent.
387 // and then populate counter set with counter object
388 // which will be needed to close counters when node is removed.
389 metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter);
395 * This method updates counter values.
397 private void updateCounter(Counter counter, long counterValue) {
399 // reset counter to zero
400 counter.decrement(counter.get());
401 // set counter to specified value
402 counter.increment(counterValue);
403 } catch (IllegalStateException e) {
404 LOG.error("Metric counter ({}) update has got exception: ", counter, e);
409 public void remove(InstanceIdentifier<Node> identifier, Node node) {
410 NodeId nodeId = node.getId();
411 String dpId = nodeId.getValue().split(":")[1];
412 if (nodes.contains(dpId)) {
414 // remove counters set from node
415 Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
416 if (nodeMetricCounterSet != null) {
418 nodeMetricCounterSet.forEach(UncheckedCloseable::close);
421 if (nodes.size() > 0) {
422 delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval().toJava() / nodes.size();
424 stopPortStatRequestTask();
430 public void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
431 // TODO Auto-generated method stub
435 public void add(InstanceIdentifier<Node> identifier, Node node) {
436 NodeId nodeId = node.getId();
437 if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
438 LOG.trace("Locally connected switch {}",nodeId.getValue());
439 String dpId = nodeId.getValue().split(":")[1];
440 if (nodes.contains(dpId)) {
444 delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval().toJava() / nodes.size();
445 if (nodes.size() == 1) {
446 schedulePortStatRequestTask();
449 LOG.trace("Not a locally connected switch {}",nodeId.getValue());