X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=interfacemanager%2Finterfacemanager-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fgenius%2Finterfacemanager%2Fpmcounters%2FNodeConnectorStatsImpl.java;h=3627b6a3bda20add7392705bbfaa7c460f8c5187;hb=b3cc98970df1812b1425b9e578bcae63d71ed6f0;hp=c29e575778f2db5d950a14a12f03dea7b7dbfc7f;hpb=cb881e1ac3d42b5ac3a90d68fb11ebb0a5fe17d9;p=genius.git diff --git a/interfacemanager/interfacemanager-impl/src/main/java/org/opendaylight/genius/interfacemanager/pmcounters/NodeConnectorStatsImpl.java b/interfacemanager/interfacemanager-impl/src/main/java/org/opendaylight/genius/interfacemanager/pmcounters/NodeConnectorStatsImpl.java index c29e57577..3627b6a3b 100644 --- a/interfacemanager/interfacemanager-impl/src/main/java/org/opendaylight/genius/interfacemanager/pmcounters/NodeConnectorStatsImpl.java +++ b/interfacemanager/interfacemanager-impl/src/main/java/org/opendaylight/genius/interfacemanager/pmcounters/NodeConnectorStatsImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. + * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,281 +7,446 @@ */ package org.opendaylight.genius.interfacemanager.pmcounters; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.math.BigInteger; -import java.util.ArrayList; +import static org.opendaylight.genius.interfacemanager.IfmUtil.nullToEmpty; +import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nonnull; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.NotificationService; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.genius.mdsalutil.AbstractDataChangeListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap; +import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase; +import org.opendaylight.genius.interfacemanager.IfmConstants; +import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache; +import org.opendaylight.genius.interfacemanager.listeners.PortNameCache; +import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils; +import org.opendaylight.infrautils.metrics.Counter; +import org.opendaylight.infrautils.metrics.Labeled; +import org.opendaylight.infrautils.metrics.MetricDescriptor; +import org.opendaylight.infrautils.metrics.MetricProvider; +import org.opendaylight.infrautils.utils.UncheckedCloseable; +import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList; +import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig; +import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NodeConnectorStatsImpl extends AbstractDataChangeListener{ +@Singleton +public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListenerBase { + + private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class); - private static final Logger logger = LoggerFactory.getLogger(NodeConnectorStatsImpl.class); - private static final String STATS_POLL_FLAG = "interfacemgr.pmcounters.poll"; private static final int THREAD_POOL_SIZE = 4; - private static final int NO_DELAY = 0; - public static final PMAgentForNodeConnectorCounters pmagent = new PMAgentForNodeConnectorCounters(); - private PortRpcStatisticsListener portStatsListener = new PortRpcStatisticsListener(); - private FlowRpcStatisticsListener flowTableStatsListener = new FlowRpcStatisticsListener(); - private List nodes = new ArrayList<>(); - Map> nodeAndNcIdOFPortDurationMap = new ConcurrentHashMap<>(); - Map> nodeAndNcIdOFPortReceiveDropMap = new ConcurrentHashMap<>(); - Map> nodeAndNcIdOFPortReceiveError = new ConcurrentHashMap<>(); - Map> nodeAndNcIdPacketSentMap = new ConcurrentHashMap<>(); - Map> nodeAndNcIdPacketReceiveMap = new ConcurrentHashMap<>(); - Map> nodeAndNcIdBytesSentMap = new ConcurrentHashMap<>(); - Map> nodeAndNcIdBytesReceiveMap = new ConcurrentHashMap<>(); - Map> nodeAndEntriesPerOFTableMap = new ConcurrentHashMap<>(); + private final Set nodes = ConcurrentHashMap.newKeySet(); + private final Map> metricsCountersPerNodeMap = new ConcurrentHashMap<>(); + private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService; + private final ScheduledExecutorService portStatExecutorService; + private final EntityOwnershipUtils entityOwnershipUtils; + private final PortNameCache portNameCache; + private final InterfaceChildCache interfaceChildCache; + private final IfmConfig ifmConfig; + private final MetricProvider metricProvider; + + private volatile int delayStatsQuery; private ScheduledFuture scheduledResult; - private OpendaylightPortStatisticsService statPortService; - private ScheduledExecutorService portStatExecutorService; - private OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService; - public NodeConnectorStatsImpl(DataBroker db, NotificationService notificationService, OpendaylightPortStatisticsService statPortService, OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService) { - super(Node.class); - this.statPortService = statPortService; - this.opendaylightFlowTableStatisticsService = opendaylightFlowTableStatisticsService; - registerListener(db); - portStatExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE, getThreadFactory("Port Stats Request Task")); - notificationService.registerNotificationListener(portStatsListener); - notificationService.registerNotificationListener(flowTableStatsListener); - pmagent.registerMbean(); - } - private void registerListener(final DataBroker db) { - try { - db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, - getWildCardPath(), NodeConnectorStatsImpl.this, AsyncDataBroker.DataChangeScope.SUBTREE); - } catch (final Exception e) { - logger.error("NodeConnectorStatsImpl: DataChange listener registration fail!", e); - throw new IllegalStateException("NodeConnectorStatsImpl: registration Listener failed.", e); - } + @Inject + public NodeConnectorStatsImpl(DataBroker dataBroker, + final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService, + final EntityOwnershipUtils entityOwnershipUtils, + final PortNameCache portNameCache, + final InterfaceChildCache interfaceChildCache, + final IfmConfig ifmConfigObj, + final MetricProvider metricProvider) { + super(Node.class, NodeConnectorStatsImpl.class); + this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService; + this.entityOwnershipUtils = entityOwnershipUtils; + this.portNameCache = portNameCache; + this.interfaceChildCache = interfaceChildCache; + this.ifmConfig = ifmConfigObj; + this.metricProvider = metricProvider; + registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker); + portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG); } - private InstanceIdentifier getWildCardPath() { + @Override + public InstanceIdentifier getWildCardPath() { return InstanceIdentifier.create(Nodes.class).child(Node.class); } + @Override + protected NodeConnectorStatsImpl getDataTreeChangeListener() { + return NodeConnectorStatsImpl.this; + } + + @Override + @PreDestroy + public void close() { + // close the nested counter objects for each node + metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close)); + } + /* * PortStat request task is started when first DPN gets connected */ private void schedulePortStatRequestTask() { - if (!Boolean.getBoolean(STATS_POLL_FLAG)) { - logger.info("Port statistics is turned off"); + if (!ifmConfig.isIfmStatsPollEnabled()) { + LOG.info("Port statistics is turned off"); return; } - logger.info("Scheduling port statistics request"); + LOG.info("Scheduling port statistics request"); PortStatRequestTask portStatRequestTask = new PortStatRequestTask(); - scheduledResult = portStatExecutorService.scheduleAtFixedRate(portStatRequestTask, NO_DELAY, 10000, TimeUnit.MILLISECONDS); + scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask, + ifmConfig.getIfmStatsDefPollInterval(), ifmConfig.getIfmStatsDefPollInterval(), TimeUnit.MINUTES); } /* * PortStat request task is stopped when last DPN is removed. */ private void stopPortStatRequestTask() { - if(scheduledResult != null) { - logger.info("Stopping port statistics request"); + if (scheduledResult != null) { + LOG.info("Stopping port statistics request"); scheduledResult.cancel(true); } } /* - * This task queries for node connector statistics as well as flowtables statistics every 10 secs. - * Minimum period which can be configured for PMJob is 10 secs. + * This task queries for node connector statistics as well as flowtables + * statistics every 10 secs. Minimum period which can be configured for + * PMJob is 10 secs. */ private class PortStatRequestTask implements Runnable { @Override public void run() { - if(logger.isTraceEnabled()) { - logger.trace("Requesting port stats - {}"); + if (LOG.isTraceEnabled()) { + LOG.trace("Requesting port stats - {}"); } - for (BigInteger node : nodes) { - logger.trace("Requesting AllNodeConnectorStatistics for node - {}", node); - statPortService.getAllNodeConnectorsStatistics(buildGetAllNodeConnectorStatistics(node)); - opendaylightFlowTableStatisticsService.getFlowTablesStatistics(buildGetFlowTablesStatistics(node)); + for (String node : nodes) { + LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node); + // Call RPC to Get NodeConnector Stats for node + ListenableFuture> ncStatsFuture = + opendaylightDirectStatisticsService.getNodeConnectorStatistics( + buildGetNodeConnectorStatisticsInput(node)); + + Futures.addCallback(ncStatsFuture, new FutureCallback>() { + + @Override + public void onFailure(@Nonnull Throwable error) { + LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error); + } + + @Override + public void onSuccess(RpcResult result) { + if (result != null) { + if (result.isSuccessful()) { + GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult(); + // process NodeConnectorStatistics RPC result + processNodeConnectorStatistics(ncStatsRpcResult, node); + } else { + LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}", + node, result.getErrors()); + } + } + } + }, MoreExecutors.directExecutor()); + + // Call RPC to Get flow stats for node + ListenableFuture> flowStatsFuture = + opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node)); + + Futures.addCallback(flowStatsFuture, new FutureCallback>() { + + @Override + public void onFailure(@Nonnull Throwable error) { + LOG.error("getFlowStatistics RPC failed for node: {} ", node, error); + } + + @Override + public void onSuccess(RpcResult result) { + if (result != null) { + if (result.isSuccessful()) { + GetFlowStatisticsOutput flowStatsRpcResult = result.getResult(); + // process FlowStatistics RPC result + processFlowStatistics(flowStatsRpcResult, node); + } else { + LOG.error("getFlowStatistics RPC failed for node: {} with error: {}", + node, result.getErrors()); + } + } + } + }, MoreExecutors.directExecutor()); + + delay(); } } - private GetAllNodeConnectorsStatisticsInput buildGetAllNodeConnectorStatistics(BigInteger dpId) { - return new GetAllNodeConnectorsStatisticsInputBuilder() - .setNode( - new NodeRef(InstanceIdentifier.builder(Nodes.class) - .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build())).build(); + /** + * The delay is added to spread the RPC call of the switches to query statistics + * across the polling interval. + * delay factor is calculated by dividing pollinginterval by no.of.switches. + */ + private void delay() { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery)); + } catch (InterruptedException ex) { + LOG.error("InterruptedException"); + } } - private GetFlowTablesStatisticsInput buildGetFlowTablesStatistics(BigInteger dpId) { - return new GetFlowTablesStatisticsInputBuilder() - .setNode( - new NodeRef(InstanceIdentifier.builder(Nodes.class) - .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build())).build(); + /** + * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC. + */ + private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) { + return new GetNodeConnectorStatisticsInputBuilder() + .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class) + .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build())) + .build(); } + /** + * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC. + */ + private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) { + return new GetFlowStatisticsInputBuilder() + .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class) + .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build())) + .build(); + } } - private ThreadFactory getThreadFactory(String threadNameFormat) { - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setNameFormat(threadNameFormat); - builder.setUncaughtExceptionHandler( - (t, e) -> logger.error("Received Uncaught Exception event in Thread: {}", t.getName(), e)); - return builder.build(); - } - - /* - * PortRpcStatisticsListener listens for the NodeConnectorStatisticsUpdate and then update the corresponding counter map + /** + * This method processes NodeConnectorStatistics RPC result. + * It performs: + * - fetches various OF Port counters values + * - creates/updates new OF Port counters using Infrautils metrics API + * - set counter with values fetched from NodeConnectorStatistics */ - class PortRpcStatisticsListener implements OpendaylightPortStatisticsListener { - - @Override - public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate ncStats) { - Map ncIdOFPortDurationMap = new HashMap<>(); - Map ncIdOFPortReceiveDropMap = new HashMap<>(); - Map ncIdOFPortReceiveError = new HashMap<>(); - Map ncIdPacketSentMap = new HashMap<>(); - Map ncIdPacketReceiveMap = new HashMap<>(); - Map ncIdBytesSentMap = new HashMap<>(); - Map ncIdBytesReceiveMap = new HashMap<>(); - List ncStatsAndPortMapList = ncStats.getNodeConnectorStatisticsAndPortNumberMap(); - NodeId nodeId = ncStats.getId(); - String node = nodeId.getValue().split(":")[1]; - for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) { - NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId(); - String port = nodeConnector.getValue().split(":")[2]; - String nodePortStr = "dpnId_" + node + "_portNum_" + port; - ncIdOFPortDurationMap.put("OFPortDuration:" + nodePortStr + "_OFPortDuration", ncStatsAndPortMap.getDuration().getSecond().getValue().toString()); - ncIdOFPortReceiveDropMap.put("PacketsPerOFPortReceiveDrop:" + nodePortStr + "_PacketsPerOFPortReceiveDrop", ncStatsAndPortMap.getReceiveDrops().toString()); - ncIdOFPortReceiveError.put("PacketsPerOFPortReceiveError:" + nodePortStr + "_PacketsPerOFPortReceiveError", ncStatsAndPortMap.getReceiveErrors().toString()); - ncIdPacketSentMap.put("PacketsPerOFPortSent:" + nodePortStr + "_PacketsPerOFPortSent", ncStatsAndPortMap.getPackets().getTransmitted().toString()); - ncIdPacketReceiveMap.put("PacketsPerOFPortReceive:" + nodePortStr + "_PacketsPerOFPortReceive", ncStatsAndPortMap.getPackets().getReceived().toString()); - ncIdBytesSentMap.put("BytesPerOFPortSent:" + nodePortStr + "_BytesPerOFPortSent", ncStatsAndPortMap.getBytes().getTransmitted().toString()); - ncIdBytesReceiveMap.put("BytesPerOFPortReceive:" + nodePortStr + "_BytesPerOFPortReceive", ncStatsAndPortMap.getBytes().getReceived().toString()); + private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput, + String dpid) { + String port = ""; + String portUuid = ""; + List ncStatsAndPortMapList = nodeConnectorStatisticsOutput + .getNodeConnectorStatisticsAndPortNumberMap(); + // Parse NodeConnectorStatistics and create/update counters for them + for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : nullToEmpty(ncStatsAndPortMapList)) { + NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId(); + LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid); + port = nodeConnector.getValue(); + // update port name as per port name maintained in portNameCache + String portNameInCache = "openflow" + ":" + dpid + ":" + port; + java.util.Optional portName = portNameCache.get(portNameInCache); + if (portName.isPresent()) { + Optional> interfaceChildEntries = interfaceChildCache + .getInterfaceChildEntries(portName.get()); + if (interfaceChildEntries.isPresent()) { + if (!interfaceChildEntries.get().isEmpty()) { + portUuid = interfaceChildEntries.get().get(0).getChildInterface(); + LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get()); + } else { + LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.", + portName.get()); + continue; + } + } else { + LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.", + portName.get()); + continue; + } } - logger.trace("Port Stats {}", ncStatsAndPortMapList); - //Storing allNodeConnectorStats(like ncIdOFPortDurationMap) in a map with key as node for easy removal and addition of allNodeConnectorStats. - nodeAndNcIdOFPortDurationMap.put(node, ncIdOFPortDurationMap); - nodeAndNcIdOFPortReceiveDropMap.put(node, ncIdOFPortReceiveDropMap); - nodeAndNcIdOFPortReceiveError.put(node, ncIdOFPortReceiveError); - nodeAndNcIdPacketSentMap.put(node, ncIdPacketSentMap); - nodeAndNcIdPacketReceiveMap.put(node, ncIdPacketReceiveMap); - nodeAndNcIdBytesSentMap.put(node, ncIdBytesSentMap); - nodeAndNcIdBytesReceiveMap.put(node, ncIdBytesReceiveMap); - //Combining the stats of all nodeconnectors in all nodes. This Map will be stored under MBean which will be queried as regular intervals. - ncIdOFPortDurationMap = combineAllNodesStats(nodeAndNcIdOFPortDurationMap); - ncIdOFPortReceiveDropMap = combineAllNodesStats(nodeAndNcIdOFPortReceiveDropMap); - ncIdOFPortReceiveError = combineAllNodesStats(nodeAndNcIdOFPortReceiveError); - ncIdPacketSentMap = combineAllNodesStats(nodeAndNcIdPacketSentMap); - ncIdPacketReceiveMap = combineAllNodesStats(nodeAndNcIdPacketReceiveMap); - ncIdBytesSentMap = combineAllNodesStats(nodeAndNcIdBytesSentMap); - ncIdBytesReceiveMap = combineAllNodesStats(nodeAndNcIdBytesReceiveMap); - pmagent.connectToPMAgent(ncIdOFPortDurationMap, ncIdOFPortReceiveDropMap, ncIdOFPortReceiveError, ncIdPacketSentMap, ncIdPacketReceiveMap, ncIdBytesSentMap, ncIdBytesReceiveMap); - } - /* - * Input allNodesStats contains statistics of all nodeConnectors of all nodes. Key is the node and values contains another map with key as node connector and value as statresult. - * Output will be a map with key as nodeconnector and value as the statresult. The key contains nodeconnectors of all the nodes. - */ + Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null); + long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue(); + updateCounter(counter, ofPortDuration); + + counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null); + long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue(); + updateCounter(counter, packetsPerOFPortReceiveDrop); + + counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null); + long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue(); + updateCounter(counter, packetsPerOFPortReceiveError); + + counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null); + long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue(); + updateCounter(counter, packetsPerOFPortSent); + + counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null); + long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue(); + updateCounter(counter, packetsPerOFPortReceive); + + counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null); + long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue(); + updateCounter(counter, bytesPerOFPortSent); + + counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null); + long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue(); + updateCounter(counter, bytesPerOFPortReceive); + } } - private Map combineAllNodesStats(Map> allNodesStats) { - Map allNcsStatsMap = new HashMap<>(); - for (Map.Entry> entry : allNodesStats.entrySet()) { - Map ncStatsMap = entry.getValue(); - for (Map.Entry statResult : ncStatsMap.entrySet()) { - allNcsStatsMap.put(statResult.getKey(), statResult.getValue()); - } + /** + * This method processes FlowStatistics RPC result. + * It performs: + * - fetches all flows of node + * - stores flows count per table in local map + * - creates/updates Flow table counters using Infrautils metrics API + * - set counter with values fetched from FlowStatistics + */ + private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) { + Map flowTableMap = new HashMap<>(); + // Get all flows for node from RPC result + List flowTableAndStatisticsMapList = flowStatsOutput.getFlowAndStatisticsMapList(); + for (FlowAndStatisticsMapList flowAndStatisticsMap : nullToEmpty(flowTableAndStatisticsMapList)) { + short tableId = flowAndStatisticsMap.getTableId(); + // populate map to maintain flow count per table + flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet(); + } + LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid); + for (Map.Entry flowTable : flowTableMap.entrySet()) { + Short tableId = flowTable.getKey(); + AtomicInteger flowCount = flowTable.getValue(); + Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null, + tableId.toString()); + // update counter value + updateCounter(counter, flowCount.longValue()); } - return allNcsStatsMap; } /* - * FlowRpcStatisticsListener listens for the FlowTableStatisticsUpdate and then update the corresponding counter map + * This method returns counter and also creates counter if does not exist. + * + * @param counterName name of the counter + * @param switchId datapath-id value + * @param port port-id value + * @param aliasId alias-id value + * @param tableId table-id value of switch + * @return counter object */ - class FlowRpcStatisticsListener implements OpendaylightFlowTableStatisticsListener { - - @Override - public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate flowTableStats) { - String node = flowTableStats.getId().getValue().split(":")[1]; - Map entriesPerOFTableMap = new HashMap<>(); - List flowTableAndStatisticsMapList = flowTableStats.getFlowTableAndStatisticsMap(); - for (FlowTableAndStatisticsMap flowTableAndStatisticsMap : flowTableAndStatisticsMapList) { - String nodeTableStr = "dpnId_" + node + "_table_" + flowTableAndStatisticsMap.getTableId().getValue().toString(); - entriesPerOFTableMap.put("EntriesPerOFTable:" + nodeTableStr + "_EntriesPerOFTable", flowTableAndStatisticsMap.getActiveFlows().getValue().toString()); - } - nodeAndEntriesPerOFTableMap.put(node, entriesPerOFTableMap); - entriesPerOFTableMap = combineAllNodesStats(nodeAndEntriesPerOFTableMap); - pmagent.connectToPMAgentAndInvokeEntriesPerOFTable(entriesPerOFTableMap); + private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) { + /* + * Pattern to be followed for key generation: + * + * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value, + * name=counterName} + * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName} + */ + Counter counter = null; + if (port != null) { + Labeled>>>> labeledCounter = + metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius") + .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(), + CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID, + CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID, + CounterConstants.LBL_KEY_COUNTER_NAME); + counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId) + .label(port).label(aliasId).label(counterName); } + if (tableId != null) { + Labeled>>> labeledCounter = + metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius") + .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(), + CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID, + CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME); + counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId) + .label(tableId).label(counterName); + } + + // create counters set for node if absent. + // and then populate counter set with counter object + // which will be needed to close counters when node is removed. + metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter); + return counter; + } + + /** + * This method updates counter values. + */ + private void updateCounter(Counter counter, long counterValue) { + try { + // reset counter to zero + counter.decrement(counter.get()); + // set counter to specified value + counter.increment(counterValue); + } catch (IllegalStateException e) { + LOG.error("Metric counter ({}) update has got exception: ", counter, e); + } } @Override protected void remove(InstanceIdentifier identifier, Node node) { NodeId nodeId = node.getId(); - String nodeVal = nodeId.getValue().split(":")[1]; - BigInteger dpId = new BigInteger(nodeVal); + String dpId = nodeId.getValue().split(":")[1]; if (nodes.contains(dpId)) { nodes.remove(dpId); - nodeAndNcIdOFPortDurationMap.remove(nodeVal); - nodeAndNcIdOFPortReceiveDropMap.remove(nodeVal); - nodeAndNcIdOFPortReceiveError.remove(nodeVal); - nodeAndNcIdPacketSentMap.remove(nodeVal); - nodeAndNcIdPacketReceiveMap.remove(nodeVal); - nodeAndNcIdBytesSentMap.remove(nodeVal); - nodeAndNcIdBytesReceiveMap.remove(nodeVal); - nodeAndEntriesPerOFTableMap.remove(nodeVal); + // remove counters set from node + Set nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId); + if (nodeMetricCounterSet != null) { + // remove counters + nodeMetricCounterSet.forEach(UncheckedCloseable::close); + } } - if (nodes.isEmpty()) { + if (nodes.size() > 0) { + delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size(); + } else { stopPortStatRequestTask(); + delayStatsQuery = 0; } } @Override - protected void update(InstanceIdentifier identifier, Node original, - Node update) { + protected void update(InstanceIdentifier identifier, Node original, Node update) { // TODO Auto-generated method stub } @Override protected void add(InstanceIdentifier identifier, Node node) { NodeId nodeId = node.getId(); - BigInteger dpId = new BigInteger(nodeId.getValue().split(":")[1]); - if (nodes.contains(dpId)) { - return; - } - nodes.add(dpId); - if (nodes.size() == 1) { - schedulePortStatRequestTask(); + if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) { + LOG.trace("Locally connected switch {}",nodeId.getValue()); + String dpId = nodeId.getValue().split(":")[1]; + if (nodes.contains(dpId)) { + return; + } + nodes.add(dpId); + delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size(); + if (nodes.size() == 1) { + schedulePortStatRequestTask(); + } + } else { + LOG.trace("Not a locally connected switch {}",nodeId.getValue()); } } }