2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.interfacemanager.pmcounters;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.JdkFutureAdapters;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.ThreadFactoryBuilder;
17 import java.math.BigInteger;
18 import java.util.HashMap;
19 import java.util.List;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import javax.annotation.Nonnull;
31 import javax.annotation.PreDestroy;
32 import javax.inject.Inject;
33 import javax.inject.Singleton;
35 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
36 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
37 import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
38 import org.opendaylight.genius.interfacemanager.IfmConstants;
39 import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache;
40 import org.opendaylight.genius.interfacemanager.listeners.PortNameCache;
41 import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
42 import org.opendaylight.infrautils.metrics.Counter;
43 import org.opendaylight.infrautils.metrics.Labeled;
44 import org.opendaylight.infrautils.metrics.MetricDescriptor;
45 import org.opendaylight.infrautils.metrics.MetricProvider;
46 import org.opendaylight.infrautils.utils.UncheckedCloseable;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.opendaylight.yangtools.yang.common.RpcResult;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
70 public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListenerBase<Node, NodeConnectorStatsImpl> {
72 private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
74 private static final int THREAD_POOL_SIZE = 4;
75 private final Set<BigInteger> nodes = ConcurrentHashMap.newKeySet();
76 Map<BigInteger, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
77 private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
78 private final ScheduledExecutorService portStatExecutorService;
79 private final EntityOwnershipUtils entityOwnershipUtils;
80 private final PortNameCache portNameCache;
81 private final InterfaceChildCache interfaceChildCache;
82 private final IfmConfig ifmConfig;
83 private final MetricProvider metricProvider;
85 private volatile int delayStatsQuery;
86 private ScheduledFuture<?> scheduledResult;
89 public NodeConnectorStatsImpl(DataBroker dataBroker,
90 final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService,
91 final EntityOwnershipUtils entityOwnershipUtils,
92 final PortNameCache portNameCache,
93 final InterfaceChildCache interfaceChildCache,
94 final IfmConfig ifmConfigObj,
95 final MetricProvider metricProvider) {
96 super(Node.class, NodeConnectorStatsImpl.class);
97 this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService;
98 this.entityOwnershipUtils = entityOwnershipUtils;
99 this.portNameCache = portNameCache;
100 this.interfaceChildCache = interfaceChildCache;
101 this.ifmConfig = ifmConfigObj;
102 this.metricProvider = metricProvider;
103 registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
104 portStatExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
105 getThreadFactory("Port Stats " + "Request Task"));
109 public InstanceIdentifier<Node> getWildCardPath() {
110 return InstanceIdentifier.create(Nodes.class).child(Node.class);
114 protected NodeConnectorStatsImpl getDataTreeChangeListener() {
115 return NodeConnectorStatsImpl.this;
120 public void close() {
121 // close the nested counter objects for each node
122 metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
126 * PortStat request task is started when first DPN gets connected
128 private void schedulePortStatRequestTask() {
129 if (!ifmConfig.isIfmStatsPollEnabled()) {
130 LOG.info("Port statistics is turned off");
133 LOG.info("Scheduling port statistics request");
134 PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
135 scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask,
136 ifmConfig.getIfmStatsDefPollInterval(), ifmConfig.getIfmStatsDefPollInterval(), TimeUnit.MINUTES);
140 * PortStat request task is stopped when last DPN is removed.
142 private void stopPortStatRequestTask() {
143 if (scheduledResult != null) {
144 LOG.info("Stopping port statistics request");
145 scheduledResult.cancel(true);
150 * This task queries for node connector statistics as well as flowtables
151 * statistics every 10 secs. Minimum period which can be configured for
154 private class PortStatRequestTask implements Runnable {
158 if (LOG.isTraceEnabled()) {
159 LOG.trace("Requesting port stats - {}");
161 for (BigInteger node : nodes) {
162 LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
163 // Call RPC to Get NodeConnector Stats for node
164 Future<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture = opendaylightDirectStatisticsService
165 .getNodeConnectorStatistics(buildGetNodeConnectorStatisticsInput(node));
166 //Create ListenableFuture to get RPC result asynchronously
167 ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsListenableFuture =
168 JdkFutureAdapters.listenInPoolThread(ncStatsFuture);
170 Futures.addCallback(ncStatsListenableFuture, new
171 FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
174 public void onFailure(@Nonnull Throwable error) {
175 LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error);
179 public void onSuccess(RpcResult<GetNodeConnectorStatisticsOutput> result) {
180 if (result != null) {
181 if (result.isSuccessful()) {
182 GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult();
183 // process NodeConnectorStatistics RPC result
184 processNodeConnectorStatistics(ncStatsRpcResult, node);
186 LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}",
187 node, result.getErrors());
191 }, MoreExecutors.directExecutor());
193 // Call RPC to Get flow stats for node
194 Future<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture = opendaylightDirectStatisticsService
195 .getFlowStatistics(buildGetFlowStatisticsInput(node));
196 //Create ListenableFuture to get RPC result asynchronously
197 ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsListenableFuture =
198 JdkFutureAdapters.listenInPoolThread(flowStatsFuture);
200 Futures.addCallback(flowStatsListenableFuture, new
201 FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
204 public void onFailure(@Nonnull Throwable error) {
205 LOG.error("getFlowStatistics RPC failed for node: {} ", node, error);
209 public void onSuccess(RpcResult<GetFlowStatisticsOutput> result) {
210 if (result != null) {
211 if (result.isSuccessful()) {
212 GetFlowStatisticsOutput flowStatsRpcResult = result.getResult();
213 // process FlowStatistics RPC result
214 processFlowStatistics(flowStatsRpcResult, node);
216 LOG.error("getFlowStatistics RPC failed for node: {} with error: {}",
217 node, result.getErrors());
221 }, MoreExecutors.directExecutor());
228 * The delay is added to spread the RPC call of the switches to query statistics
229 * across the polling interval.
230 * delay factor is calculated by dividing pollinginterval by no.of.switches.
232 private void delay() {
234 Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery));
235 } catch (InterruptedException ex) {
236 LOG.error("InterruptedException");
241 * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
243 private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(BigInteger dpId) {
244 return new GetNodeConnectorStatisticsInputBuilder()
245 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
246 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
251 * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
253 private GetFlowStatisticsInput buildGetFlowStatisticsInput(BigInteger dpId) {
254 return new GetFlowStatisticsInputBuilder()
255 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
256 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
261 private ThreadFactory getThreadFactory(String threadNameFormat) {
262 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
263 builder.setNameFormat(threadNameFormat);
264 builder.setUncaughtExceptionHandler((thread, exception) -> LOG
265 .error("Received Uncaught Exception event in Thread: {}", thread.getName(), exception));
266 return builder.build();
270 * This method processes NodeConnectorStatistics RPC result.
272 * - fetches various OF Port counters values
273 * - creates/updates new OF Port counters using Infrautils metrics API
274 * - set counter with values fetched from NodeConnectorStatistics
276 private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
279 String portUuid = "";
280 List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = nodeConnectorStatisticsOutput
281 .getNodeConnectorStatisticsAndPortNumberMap();
282 // Parse NodeConnectorStatistics and create/update counters for them
283 for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
284 NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
285 LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid.toString());
286 port = nodeConnector.getValue();
287 // update port name as per port name maintained in portNameCache
288 String portNameInCache = "openflow" + ":" + dpid.toString() + ":" + port;
289 java.util.Optional<String> portName = portNameCache.get(portNameInCache);
290 if (portName.isPresent()) {
291 Optional<List<InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
292 .getInterfaceChildEntries(portName.get());
293 if (interfaceChildEntries.isPresent()) {
294 if (!interfaceChildEntries.get().isEmpty()) {
295 portUuid = interfaceChildEntries.get().get(0).getChildInterface();
296 LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get());
298 LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
303 LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
309 Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null);
310 long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue();
311 updateCounter(counter, ofPortDuration);
313 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null);
314 long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue();
315 updateCounter(counter, packetsPerOFPortReceiveDrop);
317 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null);
318 long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue();
319 updateCounter(counter, packetsPerOFPortReceiveError);
321 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null);
322 long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue();
323 updateCounter(counter, packetsPerOFPortSent);
325 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null);
326 long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue();
327 updateCounter(counter, packetsPerOFPortReceive);
329 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null);
330 long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue();
331 updateCounter(counter, bytesPerOFPortSent);
333 counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null);
334 long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue();
335 updateCounter(counter, bytesPerOFPortReceive);
340 * This method processes FlowStatistics RPC result.
342 * - fetches all flows of node
343 * - stores flows count per table in local map
344 * - creates/updates Flow table counters using Infrautils metrics API
345 * - set counter with values fetched from FlowStatistics
347 private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, BigInteger dpid) {
348 Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
349 // Get all flows for node from RPC result
350 List<FlowAndStatisticsMapList> flowTableAndStatisticsMapList = flowStatsOutput.getFlowAndStatisticsMapList();
351 for (FlowAndStatisticsMapList flowAndStatisticsMap : flowTableAndStatisticsMapList) {
352 short tableId = flowAndStatisticsMap.getTableId();
353 // populate map to maintain flow count per table
354 flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
356 LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid.toString());
357 for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
358 Short tableId = flowTable.getKey();
359 AtomicInteger flowCount = flowTable.getValue();
360 Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null,
362 // update counter value
363 updateCounter(counter, flowCount.longValue());
368 * This method returns counter and also creates counter if does not exist.
370 * @param counterName name of the counter
371 * @param switchId datapath-id value
372 * @param port port-id value
373 * @param aliasId alias-id value
374 * @param tableId table-id value of switch
375 * @return counter object
377 private Counter getCounter(String counterName, BigInteger switchId, String port, String aliasId, String tableId) {
379 * Pattern to be followed for key generation:
381 * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value,
383 * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName}
385 Counter counter = null;
387 Labeled<Labeled<Labeled<Labeled<Labeled<Counter>>>>> labeledCounter =
388 metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
389 .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
390 CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
391 CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
392 CounterConstants.LBL_KEY_COUNTER_NAME);
393 counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId.toString())
394 .label(port).label(aliasId).label(counterName);
396 if (tableId != null) {
397 Labeled<Labeled<Labeled<Labeled<Counter>>>> labeledCounter =
398 metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
399 .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
400 CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
401 CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
402 counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId.toString())
403 .label(tableId).label(counterName);
406 // create counters set for node if absent.
407 // and then populate counter set with counter object
408 // which will be needed to close counters when node is removed.
409 metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter);
415 * This method updates counter values.
417 private void updateCounter(Counter counter, long counterValue) {
419 // reset counter to zero
420 counter.decrement(counter.get());
421 // set counter to specified value
422 counter.increment(counterValue);
423 } catch (IllegalStateException e) {
424 LOG.error("Metric counter ({}) update has got exception: ", counter, e);
429 protected void remove(InstanceIdentifier<Node> identifier, Node node) {
430 NodeId nodeId = node.getId();
431 String nodeVal = nodeId.getValue().split(":")[1];
432 BigInteger dpId = new BigInteger(nodeVal);
433 if (nodes.contains(dpId)) {
435 // remove counters set from node
436 Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
437 if (nodeMetricCounterSet != null) {
439 nodeMetricCounterSet.forEach(UncheckedCloseable::close);
442 if (nodes.size() > 0) {
443 delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
445 stopPortStatRequestTask();
451 protected void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
452 // TODO Auto-generated method stub
456 protected void add(InstanceIdentifier<Node> identifier, Node node) {
457 NodeId nodeId = node.getId();
458 if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
459 LOG.trace("Locally connected switch {}",nodeId.getValue());
460 BigInteger dpId = new BigInteger(nodeId.getValue().split(":")[1]);
461 if (nodes.contains(dpId)) {
465 delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
466 if (nodes.size() == 1) {
467 schedulePortStatRequestTask();
470 LOG.trace("Not a locally connected switch {}",nodeId.getValue());