2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.interfacemanager.pmcounters;
10 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.HashMap;
18 import java.util.Optional;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.ScheduledFuture;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import javax.annotation.PreDestroy;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.apache.aries.blueprint.annotation.service.Reference;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.opendaylight.genius.interfacemanager.IfmConstants;
31 import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache;
32 import org.opendaylight.genius.interfacemanager.listeners.PortNameCache;
33 import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
34 import org.opendaylight.infrautils.metrics.Counter;
35 import org.opendaylight.infrautils.metrics.Labeled;
36 import org.opendaylight.infrautils.metrics.MetricDescriptor;
37 import org.opendaylight.infrautils.metrics.MetricProvider;
38 import org.opendaylight.infrautils.utils.UncheckedCloseable;
39 import org.opendaylight.infrautils.utils.concurrent.Executors;
40 import org.opendaylight.mdsal.binding.api.DataBroker;
41 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
42 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntryKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMapKey;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
69 public class NodeConnectorStatsImpl extends AbstractClusteredAsyncDataTreeChangeListener<Node> {
71 private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
73 private static final int THREAD_POOL_SIZE = 4;
74 private final Set<String> nodes = ConcurrentHashMap.newKeySet();
75 private final Map<String, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
76 private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
77 private final ScheduledExecutorService portStatExecutorService;
78 private final EntityOwnershipUtils entityOwnershipUtils;
79 private final PortNameCache portNameCache;
80 private final InterfaceChildCache interfaceChildCache;
81 private final IfmConfig ifmConfig;
82 private final MetricProvider metricProvider;
84 private volatile int delayStatsQuery;
85 private ScheduledFuture<?> scheduledResult;
88 public NodeConnectorStatsImpl(@Reference DataBroker dataBroker,
89 final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService,
90 final EntityOwnershipUtils entityOwnershipUtils,
91 final PortNameCache portNameCache,
92 final InterfaceChildCache interfaceChildCache,
93 final IfmConfig ifmConfigObj,
94 final @Reference MetricProvider metricProvider) {
95 super(dataBroker, LogicalDatastoreType.OPERATIONAL,
96 InstanceIdentifier.create(Nodes.class).child(Node.class),
97 Executors.newSingleThreadExecutor("NodeConnectorStatsImpl", LOG));
98 this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService;
99 this.entityOwnershipUtils = entityOwnershipUtils;
100 this.portNameCache = portNameCache;
101 this.interfaceChildCache = interfaceChildCache;
102 this.ifmConfig = ifmConfigObj;
103 this.metricProvider = metricProvider;
104 portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG);
109 public void close() {
110 // close the nested counter objects for each node
111 metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
115 * PortStat request task is started when first DPN gets connected
117 private void schedulePortStatRequestTask() {
118 if (!ifmConfig.isIfmStatsPollEnabled()) {
119 LOG.info("Port statistics is turned off");
122 LOG.info("Scheduling port statistics request");
123 PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
124 scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask,
125 ifmConfig.getIfmStatsDefPollInterval().toJava(), ifmConfig.getIfmStatsDefPollInterval().toJava(),
130 * PortStat request task is stopped when last DPN is removed.
132 private void stopPortStatRequestTask() {
133 if (scheduledResult != null) {
134 LOG.info("Stopping port statistics request");
135 scheduledResult.cancel(true);
140 * This task queries for node connector statistics as well as flowtables
141 * statistics every 10 secs. Minimum period which can be configured for
144 private class PortStatRequestTask implements Runnable {
148 if (LOG.isTraceEnabled()) {
149 LOG.trace("Requesting port stats");
151 for (String node : nodes) {
152 LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
153 // Call RPC to Get NodeConnector Stats for node
154 ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture =
155 opendaylightDirectStatisticsService.getNodeConnectorStatistics(
156 buildGetNodeConnectorStatisticsInput(node));
158 Futures.addCallback(ncStatsFuture, new FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
161 public void onFailure(Throwable error) {
162 LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error);
166 public void onSuccess(RpcResult<GetNodeConnectorStatisticsOutput> result) {
167 if (result != null) {
168 if (result.isSuccessful()) {
169 GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult();
170 // process NodeConnectorStatistics RPC result
171 processNodeConnectorStatistics(ncStatsRpcResult, node);
173 LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}",
174 node, result.getErrors());
178 }, portStatExecutorService);
180 // Call RPC to Get flow stats for node
181 ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture =
182 opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node));
184 Futures.addCallback(flowStatsFuture, new FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
187 public void onFailure(Throwable error) {
188 LOG.error("getFlowStatistics RPC failed for node: {} ", node, error);
192 public void onSuccess(RpcResult<GetFlowStatisticsOutput> result) {
193 if (result != null) {
194 if (result.isSuccessful()) {
195 GetFlowStatisticsOutput flowStatsRpcResult = result.getResult();
196 // process FlowStatistics RPC result
197 processFlowStatistics(flowStatsRpcResult, node);
199 LOG.error("getFlowStatistics RPC failed for node: {} with error: {}",
200 node, result.getErrors());
204 }, portStatExecutorService);
211 * The delay is added to spread the RPC call of the switches to query statistics
212 * across the polling interval.
213 * delay factor is calculated by dividing pollinginterval by no.of.switches.
215 private void delay() {
217 Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery));
218 } catch (InterruptedException ex) {
219 LOG.error("InterruptedException");
224 * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
226 private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) {
227 return new GetNodeConnectorStatisticsInputBuilder()
228 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
229 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
234 * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
236 private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) {
237 return new GetFlowStatisticsInputBuilder()
238 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
239 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
245 * This method processes NodeConnectorStatistics RPC result.
247 * - fetches various OF Port counters values
248 * - creates/updates new OF Port counters using Infrautils metrics API
249 * - set counter with values fetched from NodeConnectorStatistics
251 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
252 justification = "https://github.com/spotbugs/spotbugs/issues/811")
253 private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
256 String portUuid = "";
257 @NonNull Map<NodeConnectorStatisticsAndPortNumberMapKey, NodeConnectorStatisticsAndPortNumberMap>
258 ncStatsAndPortMapList = nodeConnectorStatisticsOutput.nonnullNodeConnectorStatisticsAndPortNumberMap();
259 // Parse NodeConnectorStatistics and create/update counters for them
260 for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList.values()) {
261 NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
262 LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid);
263 port = nodeConnector.getValue();
264 // update port name as per port name maintained in portNameCache
265 String portNameInCache = "openflow" + ":" + dpid + ":" + port;
266 java.util.Optional<String> portName = portNameCache.get(portNameInCache);
267 if (portName.isPresent()) {
268 Optional<Map<InterfaceChildEntryKey, InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
269 .getInterfaceChildEntries(portName.get());
270 if (interfaceChildEntries.isPresent()) {
271 if (!interfaceChildEntries.get().isEmpty()) {
272 portUuid = interfaceChildEntries.get().get(0).getChildInterface();
273 LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get());
275 LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
280 LOG.trace("PortUuid is not present for portname {}. Skipping IFM counters publish for this port.",
285 LOG.trace("Port {} not found in PortName Cache.", portNameInCache);
289 Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null);
290 long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue().toJava();
291 updateCounter(counter, ofPortDuration);
293 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null);
294 long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue();
295 updateCounter(counter, packetsPerOFPortReceiveDrop);
297 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null);
298 long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue();
299 updateCounter(counter, packetsPerOFPortReceiveError);
301 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null);
302 long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue();
303 updateCounter(counter, packetsPerOFPortSent);
305 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null);
306 long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue();
307 updateCounter(counter, packetsPerOFPortReceive);
309 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null);
310 long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue();
311 updateCounter(counter, bytesPerOFPortSent);
313 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null);
314 long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue();
315 updateCounter(counter, bytesPerOFPortReceive);
320 * This method processes FlowStatistics RPC result.
322 * - fetches all flows of node
323 * - stores flows count per table in local map
324 * - creates/updates Flow table counters using Infrautils metrics API
325 * - set counter with values fetched from FlowStatistics
327 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
328 justification = "https://github.com/spotbugs/spotbugs/issues/811")
329 private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) {
330 Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
331 // Get all flows for node from RPC result
332 @NonNull Map<FlowAndStatisticsMapListKey, FlowAndStatisticsMapList> flowTableAndStatisticsMapList =
333 flowStatsOutput.nonnullFlowAndStatisticsMapList();
334 for (FlowAndStatisticsMapList flowAndStatisticsMap : flowTableAndStatisticsMapList.values()) {
335 short tableId = flowAndStatisticsMap.getTableId().toJava();
336 // populate map to maintain flow count per table
337 flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
339 LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid);
340 for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
341 Short tableId = flowTable.getKey();
342 AtomicInteger flowCount = flowTable.getValue();
343 Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null,
345 // update counter value
346 updateCounter(counter, flowCount.longValue());
351 * This method returns counter and also creates counter if does not exist.
353 * @param counterName name of the counter
354 * @param switchId datapath-id value
355 * @param port port-id value
356 * @param aliasId alias-id value
357 * @param tableId table-id value of switch
358 * @return counter object
360 private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) {
362 * Pattern to be followed for key generation:
364 * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value,
366 * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName}
368 Counter counter = null;
370 Labeled<Labeled<Labeled<Labeled<Labeled<Counter>>>>> labeledCounter =
371 metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
372 .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
373 CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
374 CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
375 CounterConstants.LBL_KEY_COUNTER_NAME);
376 counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId)
377 .label(port).label(aliasId).label(counterName);
379 if (tableId != null) {
380 Labeled<Labeled<Labeled<Labeled<Counter>>>> labeledCounter =
381 metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
382 .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
383 CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
384 CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
385 counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId)
386 .label(tableId).label(counterName);
389 // create counters set for node if absent.
390 // and then populate counter set with counter object
391 // which will be needed to close counters when node is removed.
392 metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter);
398 * This method updates counter values.
400 private void updateCounter(Counter counter, long counterValue) {
402 // reset counter to zero
403 counter.decrement(counter.get());
404 // set counter to specified value
405 counter.increment(counterValue);
406 } catch (IllegalStateException e) {
407 LOG.error("Metric counter ({}) update has got exception: ", counter, e);
412 public void remove(InstanceIdentifier<Node> identifier, Node node) {
413 NodeId nodeId = node.getId();
414 String dpId = nodeId.getValue().split(":")[1];
415 if (nodes.contains(dpId)) {
417 // remove counters set from node
418 Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
419 if (nodeMetricCounterSet != null) {
421 nodeMetricCounterSet.forEach(UncheckedCloseable::close);
424 if (nodes.size() > 0) {
425 delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval().toJava() / nodes.size();
427 stopPortStatRequestTask();
433 public void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
434 // TODO Auto-generated method stub
438 public void add(InstanceIdentifier<Node> identifier, Node node) {
439 NodeId nodeId = node.getId();
440 if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
441 LOG.trace("Locally connected switch {}",nodeId.getValue());
442 String dpId = nodeId.getValue().split(":")[1];
443 if (nodes.contains(dpId)) {
447 delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval().toJava() / nodes.size();
448 if (nodes.size() == 1) {
449 schedulePortStatRequestTask();
452 LOG.trace("Not a locally connected switch {}",nodeId.getValue());