2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.interfacemanager.pmcounters;
10 import static org.opendaylight.genius.interfacemanager.IfmUtil.nullToEmpty;
11 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.HashMap;
19 import java.util.List;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import javax.annotation.Nonnull;
28 import javax.annotation.PreDestroy;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
34 import org.opendaylight.genius.interfacemanager.IfmConstants;
35 import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache;
36 import org.opendaylight.genius.interfacemanager.listeners.PortNameCache;
37 import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
38 import org.opendaylight.infrautils.metrics.Counter;
39 import org.opendaylight.infrautils.metrics.Labeled;
40 import org.opendaylight.infrautils.metrics.MetricDescriptor;
41 import org.opendaylight.infrautils.metrics.MetricProvider;
42 import org.opendaylight.infrautils.utils.UncheckedCloseable;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.opendaylight.yangtools.yang.common.RpcResult;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
66 public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListenerBase<Node, NodeConnectorStatsImpl> {
68 private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
70 private static final int THREAD_POOL_SIZE = 4;
71 private final Set<String> nodes = ConcurrentHashMap.newKeySet();
72 private final Map<String, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
73 private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
74 private final ScheduledExecutorService portStatExecutorService;
75 private final EntityOwnershipUtils entityOwnershipUtils;
76 private final PortNameCache portNameCache;
77 private final InterfaceChildCache interfaceChildCache;
78 private final IfmConfig ifmConfig;
79 private final MetricProvider metricProvider;
81 private volatile int delayStatsQuery;
82 private ScheduledFuture<?> scheduledResult;
85 public NodeConnectorStatsImpl(DataBroker dataBroker,
86 final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService,
87 final EntityOwnershipUtils entityOwnershipUtils,
88 final PortNameCache portNameCache,
89 final InterfaceChildCache interfaceChildCache,
90 final IfmConfig ifmConfigObj,
91 final MetricProvider metricProvider) {
92 super(Node.class, NodeConnectorStatsImpl.class);
93 this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService;
94 this.entityOwnershipUtils = entityOwnershipUtils;
95 this.portNameCache = portNameCache;
96 this.interfaceChildCache = interfaceChildCache;
97 this.ifmConfig = ifmConfigObj;
98 this.metricProvider = metricProvider;
99 registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
100 portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG);
104 public InstanceIdentifier<Node> getWildCardPath() {
105 return InstanceIdentifier.create(Nodes.class).child(Node.class);
109 protected NodeConnectorStatsImpl getDataTreeChangeListener() {
110 return NodeConnectorStatsImpl.this;
115 public void close() {
116 // close the nested counter objects for each node
117 metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
121 * PortStat request task is started when first DPN gets connected
123 private void schedulePortStatRequestTask() {
124 if (!ifmConfig.isIfmStatsPollEnabled()) {
125 LOG.info("Port statistics is turned off");
128 LOG.info("Scheduling port statistics request");
129 PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
130 scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask,
131 ifmConfig.getIfmStatsDefPollInterval(), ifmConfig.getIfmStatsDefPollInterval(), TimeUnit.MINUTES);
135 * PortStat request task is stopped when last DPN is removed.
137 private void stopPortStatRequestTask() {
138 if (scheduledResult != null) {
139 LOG.info("Stopping port statistics request");
140 scheduledResult.cancel(true);
145 * This task queries for node connector statistics as well as flowtables
146 * statistics every 10 secs. Minimum period which can be configured for
149 private class PortStatRequestTask implements Runnable {
153 if (LOG.isTraceEnabled()) {
154 LOG.trace("Requesting port stats - {}");
156 for (String node : nodes) {
157 LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
158 // Call RPC to Get NodeConnector Stats for node
159 ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture =
160 opendaylightDirectStatisticsService.getNodeConnectorStatistics(
161 buildGetNodeConnectorStatisticsInput(node));
163 Futures.addCallback(ncStatsFuture, new FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
166 public void onFailure(@Nonnull Throwable error) {
167 LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error);
171 public void onSuccess(RpcResult<GetNodeConnectorStatisticsOutput> result) {
172 if (result != null) {
173 if (result.isSuccessful()) {
174 GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult();
175 // process NodeConnectorStatistics RPC result
176 processNodeConnectorStatistics(ncStatsRpcResult, node);
178 LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}",
179 node, result.getErrors());
183 }, MoreExecutors.directExecutor());
185 // Call RPC to Get flow stats for node
186 ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture =
187 opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node));
189 Futures.addCallback(flowStatsFuture, new FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
192 public void onFailure(@Nonnull Throwable error) {
193 LOG.error("getFlowStatistics RPC failed for node: {} ", node, error);
197 public void onSuccess(RpcResult<GetFlowStatisticsOutput> result) {
198 if (result != null) {
199 if (result.isSuccessful()) {
200 GetFlowStatisticsOutput flowStatsRpcResult = result.getResult();
201 // process FlowStatistics RPC result
202 processFlowStatistics(flowStatsRpcResult, node);
204 LOG.error("getFlowStatistics RPC failed for node: {} with error: {}",
205 node, result.getErrors());
209 }, MoreExecutors.directExecutor());
216 * The delay is added to spread the RPC call of the switches to query statistics
217 * across the polling interval.
218 * delay factor is calculated by dividing pollinginterval by no.of.switches.
220 private void delay() {
222 Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery));
223 } catch (InterruptedException ex) {
224 LOG.error("InterruptedException");
229 * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
231 private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) {
232 return new GetNodeConnectorStatisticsInputBuilder()
233 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
234 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
239 * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
241 private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) {
242 return new GetFlowStatisticsInputBuilder()
243 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
244 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
250 * This method processes NodeConnectorStatistics RPC result.
252 * - fetches various OF Port counters values
253 * - creates/updates new OF Port counters using Infrautils metrics API
254 * - set counter with values fetched from NodeConnectorStatistics
256 private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
259 String portUuid = "";
260 List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = nodeConnectorStatisticsOutput
261 .getNodeConnectorStatisticsAndPortNumberMap();
262 // Parse NodeConnectorStatistics and create/update counters for them
263 for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : nullToEmpty(ncStatsAndPortMapList)) {
264 NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
265 LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid);
266 port = nodeConnector.getValue();
267 // update port name as per port name maintained in portNameCache
268 String portNameInCache = "openflow" + ":" + dpid + ":" + port;
269 java.util.Optional<String> portName = portNameCache.get(portNameInCache);
270 if (portName.isPresent()) {
271 Optional<List<InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
272 .getInterfaceChildEntries(portName.get());
273 if (interfaceChildEntries.isPresent()) {
274 if (!interfaceChildEntries.get().isEmpty()) {
275 portUuid = interfaceChildEntries.get().get(0).getChildInterface();
276 LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get());
278 LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
283 LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
289 Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null);
290 long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue();
291 updateCounter(counter, ofPortDuration);
293 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null);
294 long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue();
295 updateCounter(counter, packetsPerOFPortReceiveDrop);
297 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null);
298 long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue();
299 updateCounter(counter, packetsPerOFPortReceiveError);
301 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null);
302 long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue();
303 updateCounter(counter, packetsPerOFPortSent);
305 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null);
306 long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue();
307 updateCounter(counter, packetsPerOFPortReceive);
309 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null);
310 long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue();
311 updateCounter(counter, bytesPerOFPortSent);
313 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null);
314 long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue();
315 updateCounter(counter, bytesPerOFPortReceive);
320 * This method processes FlowStatistics RPC result.
322 * - fetches all flows of node
323 * - stores flows count per table in local map
324 * - creates/updates Flow table counters using Infrautils metrics API
325 * - set counter with values fetched from FlowStatistics
327 private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) {
328 Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
329 // Get all flows for node from RPC result
330 List<FlowAndStatisticsMapList> flowTableAndStatisticsMapList = flowStatsOutput.getFlowAndStatisticsMapList();
331 for (FlowAndStatisticsMapList flowAndStatisticsMap : nullToEmpty(flowTableAndStatisticsMapList)) {
332 short tableId = flowAndStatisticsMap.getTableId();
333 // populate map to maintain flow count per table
334 flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
336 LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid);
337 for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
338 Short tableId = flowTable.getKey();
339 AtomicInteger flowCount = flowTable.getValue();
340 Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null,
342 // update counter value
343 updateCounter(counter, flowCount.longValue());
348 * This method returns counter and also creates counter if does not exist.
350 * @param counterName name of the counter
351 * @param switchId datapath-id value
352 * @param port port-id value
353 * @param aliasId alias-id value
354 * @param tableId table-id value of switch
355 * @return counter object
357 private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) {
359 * Pattern to be followed for key generation:
361 * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value,
363 * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName}
365 Counter counter = null;
367 Labeled<Labeled<Labeled<Labeled<Labeled<Counter>>>>> labeledCounter =
368 metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
369 .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
370 CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
371 CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
372 CounterConstants.LBL_KEY_COUNTER_NAME);
373 counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId)
374 .label(port).label(aliasId).label(counterName);
376 if (tableId != null) {
377 Labeled<Labeled<Labeled<Labeled<Counter>>>> labeledCounter =
378 metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
379 .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
380 CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
381 CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
382 counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId)
383 .label(tableId).label(counterName);
386 // create counters set for node if absent.
387 // and then populate counter set with counter object
388 // which will be needed to close counters when node is removed.
389 metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter);
395 * This method updates counter values.
397 private void updateCounter(Counter counter, long counterValue) {
399 // reset counter to zero
400 counter.decrement(counter.get());
401 // set counter to specified value
402 counter.increment(counterValue);
403 } catch (IllegalStateException e) {
404 LOG.error("Metric counter ({}) update has got exception: ", counter, e);
409 protected void remove(InstanceIdentifier<Node> identifier, Node node) {
410 NodeId nodeId = node.getId();
411 String dpId = nodeId.getValue().split(":")[1];
412 if (nodes.contains(dpId)) {
414 // remove counters set from node
415 Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
416 if (nodeMetricCounterSet != null) {
418 nodeMetricCounterSet.forEach(UncheckedCloseable::close);
421 if (nodes.size() > 0) {
422 delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
424 stopPortStatRequestTask();
430 protected void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
431 // TODO Auto-generated method stub
435 protected void add(InstanceIdentifier<Node> identifier, Node node) {
436 NodeId nodeId = node.getId();
437 if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
438 LOG.trace("Locally connected switch {}",nodeId.getValue());
439 String dpId = nodeId.getValue().split(":")[1];
440 if (nodes.contains(dpId)) {
444 delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
445 if (nodes.size() == 1) {
446 schedulePortStatRequestTask();
449 LOG.trace("Not a locally connected switch {}",nodeId.getValue());