*/
package org.opendaylight.genius.interfacemanager.pmcounters;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.math.BigInteger;
-import java.util.ArrayList;
+import static org.opendaylight.genius.interfacemanager.IfmUtil.nullToEmpty;
+import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
+import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
+import org.opendaylight.genius.interfacemanager.IfmConstants;
+import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache;
+import org.opendaylight.genius.interfacemanager.listeners.PortNameCache;
+import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
+import org.opendaylight.infrautils.metrics.Counter;
+import org.opendaylight.infrautils.metrics.Labeled;
+import org.opendaylight.infrautils.metrics.MetricDescriptor;
+import org.opendaylight.infrautils.metrics.MetricProvider;
+import org.opendaylight.infrautils.utils.UncheckedCloseable;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
-public class NodeConnectorStatsImpl extends AsyncDataTreeChangeListenerBase<Node, NodeConnectorStatsImpl> {
+public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListenerBase<Node, NodeConnectorStatsImpl> {
+
private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
- private static final String STATS_POLL_FLAG = "interfacemgr.pmcounters.poll";
+
private static final int THREAD_POOL_SIZE = 4;
- private static final int NO_DELAY = 0;
- public static final PMAgentForNodeConnectorCounters PMAGENT = new PMAgentForNodeConnectorCounters();
- private final PortRpcStatisticsListener portStatsListener = new PortRpcStatisticsListener();
- private final FlowRpcStatisticsListener flowTableStatsListener = new FlowRpcStatisticsListener();
- private final List<BigInteger> nodes = new ArrayList<>();
- Map<String, Map<String, String>> nodeAndNcIdOFPortDurationMap = new ConcurrentHashMap<>();
- Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveDropMap = new ConcurrentHashMap<>();
- Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveError = new ConcurrentHashMap<>();
- Map<String, Map<String, String>> nodeAndNcIdPacketSentMap = new ConcurrentHashMap<>();
- Map<String, Map<String, String>> nodeAndNcIdPacketReceiveMap = new ConcurrentHashMap<>();
- Map<String, Map<String, String>> nodeAndNcIdBytesSentMap = new ConcurrentHashMap<>();
- Map<String, Map<String, String>> nodeAndNcIdBytesReceiveMap = new ConcurrentHashMap<>();
- Map<String, Map<String, String>> nodeAndEntriesPerOFTableMap = new ConcurrentHashMap<>();
- private ScheduledFuture<?> scheduledResult;
- private final OpendaylightPortStatisticsService statPortService;
+ private final Set<String> nodes = ConcurrentHashMap.newKeySet();
+ private final Map<String, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
+ private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
private final ScheduledExecutorService portStatExecutorService;
- private final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService;
+ private final EntityOwnershipUtils entityOwnershipUtils;
+ private final PortNameCache portNameCache;
+ private final InterfaceChildCache interfaceChildCache;
+ private final IfmConfig ifmConfig;
+ private final MetricProvider metricProvider;
+
+ private volatile int delayStatsQuery;
+ private ScheduledFuture<?> scheduledResult;
@Inject
- public NodeConnectorStatsImpl(DataBroker dataBroker, NotificationService notificationService,
- final OpendaylightPortStatisticsService opendaylightPortStatisticsService,
- final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService) {
+ public NodeConnectorStatsImpl(DataBroker dataBroker,
+ final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService,
+ final EntityOwnershipUtils entityOwnershipUtils,
+ final PortNameCache portNameCache,
+ final InterfaceChildCache interfaceChildCache,
+ final IfmConfig ifmConfigObj,
+ final MetricProvider metricProvider) {
super(Node.class, NodeConnectorStatsImpl.class);
- this.statPortService = opendaylightPortStatisticsService;
- this.opendaylightFlowTableStatisticsService = opendaylightFlowTableStatisticsService;
+ this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService;
+ this.entityOwnershipUtils = entityOwnershipUtils;
+ this.portNameCache = portNameCache;
+ this.interfaceChildCache = interfaceChildCache;
+ this.ifmConfig = ifmConfigObj;
+ this.metricProvider = metricProvider;
registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
- portStatExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
- getThreadFactory("Port Stats " + "Request Task"));
- notificationService.registerNotificationListener(portStatsListener);
- notificationService.registerNotificationListener(flowTableStatsListener);
- PMAGENT.registerMbean();
+ portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG);
}
@Override
return NodeConnectorStatsImpl.this;
}
+ @Override
+ @PreDestroy
+ public void close() {
+ // close the nested counter objects for each node
+ metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
+ }
+
/*
* PortStat request task is started when first DPN gets connected
*/
private void schedulePortStatRequestTask() {
- if (!Boolean.getBoolean(STATS_POLL_FLAG)) {
+ if (!ifmConfig.isIfmStatsPollEnabled()) {
LOG.info("Port statistics is turned off");
return;
}
LOG.info("Scheduling port statistics request");
PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
- scheduledResult = portStatExecutorService.scheduleAtFixedRate(portStatRequestTask, NO_DELAY, 10000,
- TimeUnit.MILLISECONDS);
+ scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask,
+ ifmConfig.getIfmStatsDefPollInterval(), ifmConfig.getIfmStatsDefPollInterval(), TimeUnit.MINUTES);
}
/*
if (LOG.isTraceEnabled()) {
LOG.trace("Requesting port stats - {}");
}
- for (BigInteger node : nodes) {
- LOG.trace("Requesting AllNodeConnectorStatistics for node - {}", node);
- statPortService.getAllNodeConnectorsStatistics(buildGetAllNodeConnectorStatistics(node));
- opendaylightFlowTableStatisticsService.getFlowTablesStatistics(buildGetFlowTablesStatistics(node));
+ for (String node : nodes) {
+ LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
+ // Call RPC to Get NodeConnector Stats for node
+ ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture =
+ opendaylightDirectStatisticsService.getNodeConnectorStatistics(
+ buildGetNodeConnectorStatisticsInput(node));
+
+ Futures.addCallback(ncStatsFuture, new FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
+
+ @Override
+ public void onFailure(@Nonnull Throwable error) {
+ LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error);
+ }
+
+ @Override
+ public void onSuccess(RpcResult<GetNodeConnectorStatisticsOutput> result) {
+ if (result != null) {
+ if (result.isSuccessful()) {
+ GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult();
+ // process NodeConnectorStatistics RPC result
+ processNodeConnectorStatistics(ncStatsRpcResult, node);
+ } else {
+ LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}",
+ node, result.getErrors());
+ }
+ }
+ }
+ }, MoreExecutors.directExecutor());
+
+ // Call RPC to Get flow stats for node
+ ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture =
+ opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node));
+
+ Futures.addCallback(flowStatsFuture, new FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
+
+ @Override
+ public void onFailure(@Nonnull Throwable error) {
+ LOG.error("getFlowStatistics RPC failed for node: {} ", node, error);
+ }
+
+ @Override
+ public void onSuccess(RpcResult<GetFlowStatisticsOutput> result) {
+ if (result != null) {
+ if (result.isSuccessful()) {
+ GetFlowStatisticsOutput flowStatsRpcResult = result.getResult();
+ // process FlowStatistics RPC result
+ processFlowStatistics(flowStatsRpcResult, node);
+ } else {
+ LOG.error("getFlowStatistics RPC failed for node: {} with error: {}",
+ node, result.getErrors());
+ }
+ }
+ }
+ }, MoreExecutors.directExecutor());
+
+ delay();
}
}
- private GetAllNodeConnectorsStatisticsInput buildGetAllNodeConnectorStatistics(BigInteger dpId) {
- return new GetAllNodeConnectorsStatisticsInputBuilder()
+ /**
+ * The delay is added to spread the RPC call of the switches to query statistics
+ * across the polling interval.
+ * delay factor is calculated by dividing pollinginterval by no.of.switches.
+ */
+ private void delay() {
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery));
+ } catch (InterruptedException ex) {
+ LOG.error("InterruptedException");
+ }
+ }
+
+ /**
+ * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
+ */
+ private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) {
+ return new GetNodeConnectorStatisticsInputBuilder()
.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
- .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
+ .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
.build();
}
- private GetFlowTablesStatisticsInput buildGetFlowTablesStatistics(BigInteger dpId) {
- return new GetFlowTablesStatisticsInputBuilder()
+ /**
+ * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
+ */
+ private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) {
+ return new GetFlowStatisticsInputBuilder()
.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
- .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
+ .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
.build();
}
}
- private ThreadFactory getThreadFactory(String threadNameFormat) {
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setNameFormat(threadNameFormat);
- builder.setUncaughtExceptionHandler((thread, exception) -> LOG
- .error("Received Uncaught Exception event in Thread: {}", thread.getName(), exception));
- return builder.build();
- }
-
- /*
- * PortRpcStatisticsListener listens for the NodeConnectorStatisticsUpdate
- * and then update the corresponding counter map
+ /**
+ * This method processes NodeConnectorStatistics RPC result.
+ * It performs:
+ * - fetches various OF Port counters values
+ * - creates/updates new OF Port counters using Infrautils metrics API
+ * - set counter with values fetched from NodeConnectorStatistics
*/
- class PortRpcStatisticsListener implements OpendaylightPortStatisticsListener {
-
- @Override
- public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate ncStats) {
- Map<String, String> ncIdOFPortDurationMap = new HashMap<>();
- Map<String, String> ncIdOFPortReceiveDropMap = new HashMap<>();
- Map<String, String> ncIdOFPortReceiveError = new HashMap<>();
- Map<String, String> ncIdPacketSentMap = new HashMap<>();
- Map<String, String> ncIdPacketReceiveMap = new HashMap<>();
- Map<String, String> ncIdBytesSentMap = new HashMap<>();
- Map<String, String> ncIdBytesReceiveMap = new HashMap<>();
- List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = ncStats
- .getNodeConnectorStatisticsAndPortNumberMap();
- NodeId nodeId = ncStats.getId();
- String node = nodeId.getValue().split(":")[1];
- for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
- NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
- String port = nodeConnector.getValue().split(":")[2];
- String nodePortStr = "dpnId_" + node + "_portNum_" + port;
- ncIdOFPortDurationMap.put("OFPortDuration:" + nodePortStr + "_OFPortDuration",
- ncStatsAndPortMap.getDuration().getSecond().getValue().toString());
- ncIdOFPortReceiveDropMap.put(
- "PacketsPerOFPortReceiveDrop:" + nodePortStr + "_PacketsPerOFPortReceiveDrop",
- ncStatsAndPortMap.getReceiveDrops().toString());
- ncIdOFPortReceiveError.put(
- "PacketsPerOFPortReceiveError:" + nodePortStr + "_PacketsPerOFPortReceiveError",
- ncStatsAndPortMap.getReceiveErrors().toString());
- ncIdPacketSentMap.put("PacketsPerOFPortSent:" + nodePortStr + "_PacketsPerOFPortSent",
- ncStatsAndPortMap.getPackets().getTransmitted().toString());
- ncIdPacketReceiveMap.put("PacketsPerOFPortReceive:" + nodePortStr + "_PacketsPerOFPortReceive",
- ncStatsAndPortMap.getPackets().getReceived().toString());
- ncIdBytesSentMap.put("BytesPerOFPortSent:" + nodePortStr + "_BytesPerOFPortSent",
- ncStatsAndPortMap.getBytes().getTransmitted().toString());
- ncIdBytesReceiveMap.put("BytesPerOFPortReceive:" + nodePortStr + "_BytesPerOFPortReceive",
- ncStatsAndPortMap.getBytes().getReceived().toString());
+ private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
+ String dpid) {
+ String port = "";
+ String portUuid = "";
+ List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = nodeConnectorStatisticsOutput
+ .getNodeConnectorStatisticsAndPortNumberMap();
+ // Parse NodeConnectorStatistics and create/update counters for them
+ for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : nullToEmpty(ncStatsAndPortMapList)) {
+ NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
+ LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid);
+ port = nodeConnector.getValue();
+ // update port name as per port name maintained in portNameCache
+ String portNameInCache = "openflow" + ":" + dpid + ":" + port;
+ java.util.Optional<String> portName = portNameCache.get(portNameInCache);
+ if (portName.isPresent()) {
+ Optional<List<InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
+ .getInterfaceChildEntries(portName.get());
+ if (interfaceChildEntries.isPresent()) {
+ if (!interfaceChildEntries.get().isEmpty()) {
+ portUuid = interfaceChildEntries.get().get(0).getChildInterface();
+ LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get());
+ } else {
+ LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
+ portName.get());
+ continue;
+ }
+ } else {
+ LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
+ portName.get());
+ continue;
+ }
}
- LOG.trace("Port Stats {}", ncStatsAndPortMapList);
- // Storing allNodeConnectorStats(like ncIdOFPortDurationMap) in a
- // map with key as node for easy removal and addition of
- // allNodeConnectorStats.
- nodeAndNcIdOFPortDurationMap.put(node, ncIdOFPortDurationMap);
- nodeAndNcIdOFPortReceiveDropMap.put(node, ncIdOFPortReceiveDropMap);
- nodeAndNcIdOFPortReceiveError.put(node, ncIdOFPortReceiveError);
- nodeAndNcIdPacketSentMap.put(node, ncIdPacketSentMap);
- nodeAndNcIdPacketReceiveMap.put(node, ncIdPacketReceiveMap);
- nodeAndNcIdBytesSentMap.put(node, ncIdBytesSentMap);
- nodeAndNcIdBytesReceiveMap.put(node, ncIdBytesReceiveMap);
- // Combining the stats of all nodeconnectors in all nodes. This Map
- // will be stored under MBean which will be queried as regular
- // intervals.
- ncIdOFPortDurationMap = combineAllNodesStats(nodeAndNcIdOFPortDurationMap);
- ncIdOFPortReceiveDropMap = combineAllNodesStats(nodeAndNcIdOFPortReceiveDropMap);
- ncIdOFPortReceiveError = combineAllNodesStats(nodeAndNcIdOFPortReceiveError);
- ncIdPacketSentMap = combineAllNodesStats(nodeAndNcIdPacketSentMap);
- ncIdPacketReceiveMap = combineAllNodesStats(nodeAndNcIdPacketReceiveMap);
- ncIdBytesSentMap = combineAllNodesStats(nodeAndNcIdBytesSentMap);
- ncIdBytesReceiveMap = combineAllNodesStats(nodeAndNcIdBytesReceiveMap);
- PMAGENT.connectToPMAgent(ncIdOFPortDurationMap, ncIdOFPortReceiveDropMap, ncIdOFPortReceiveError,
- ncIdPacketSentMap, ncIdPacketReceiveMap, ncIdBytesSentMap, ncIdBytesReceiveMap);
- }
- /*
- * Input allNodesStats contains statistics of all nodeConnectors of all
- * nodes. Key is the node and values contains another map with key as
- * node connector and value as statresult. Output will be a map with key
- * as nodeconnector and value as the statresult. The key contains
- * nodeconnectors of all the nodes.
- */
+ Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null);
+ long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue();
+ updateCounter(counter, ofPortDuration);
+
+ counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null);
+ long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue();
+ updateCounter(counter, packetsPerOFPortReceiveDrop);
+
+ counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null);
+ long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue();
+ updateCounter(counter, packetsPerOFPortReceiveError);
+
+ counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null);
+ long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue();
+ updateCounter(counter, packetsPerOFPortSent);
+
+ counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null);
+ long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue();
+ updateCounter(counter, packetsPerOFPortReceive);
+
+ counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null);
+ long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue();
+ updateCounter(counter, bytesPerOFPortSent);
+
+ counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null);
+ long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue();
+ updateCounter(counter, bytesPerOFPortReceive);
+ }
}
- private Map<String, String> combineAllNodesStats(Map<String, Map<String, String>> allNodesStats) {
- Map<String, String> allNcsStatsMap = new HashMap<>();
- for (Map.Entry<String, Map<String, String>> entry : allNodesStats.entrySet()) {
- Map<String, String> ncStatsMap = entry.getValue();
- for (Map.Entry<String, String> statResult : ncStatsMap.entrySet()) {
- allNcsStatsMap.put(statResult.getKey(), statResult.getValue());
- }
+ /**
+ * This method processes FlowStatistics RPC result.
+ * It performs:
+ * - fetches all flows of node
+ * - stores flows count per table in local map
+ * - creates/updates Flow table counters using Infrautils metrics API
+ * - set counter with values fetched from FlowStatistics
+ */
+ private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) {
+ Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
+ // Get all flows for node from RPC result
+ List<FlowAndStatisticsMapList> flowTableAndStatisticsMapList = flowStatsOutput.getFlowAndStatisticsMapList();
+ for (FlowAndStatisticsMapList flowAndStatisticsMap : nullToEmpty(flowTableAndStatisticsMapList)) {
+ short tableId = flowAndStatisticsMap.getTableId();
+ // populate map to maintain flow count per table
+ flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
+ }
+ LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid);
+ for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
+ Short tableId = flowTable.getKey();
+ AtomicInteger flowCount = flowTable.getValue();
+ Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null,
+ tableId.toString());
+ // update counter value
+ updateCounter(counter, flowCount.longValue());
}
- return allNcsStatsMap;
}
/*
- * FlowRpcStatisticsListener listens for the FlowTableStatisticsUpdate and
- * then update the corresponding counter map
+ * This method returns counter and also creates counter if does not exist.
+ *
+ * @param counterName name of the counter
+ * @param switchId datapath-id value
+ * @param port port-id value
+ * @param aliasId alias-id value
+ * @param tableId table-id value of switch
+ * @return counter object
*/
- class FlowRpcStatisticsListener implements OpendaylightFlowTableStatisticsListener {
+ private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) {
+ /*
+ * Pattern to be followed for key generation:
+ *
+ * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value,
+ * name=counterName}
+ * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName}
+ */
+ Counter counter = null;
+ if (port != null) {
+ Labeled<Labeled<Labeled<Labeled<Labeled<Counter>>>>> labeledCounter =
+ metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
+ .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
+ CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
+ CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
+ CounterConstants.LBL_KEY_COUNTER_NAME);
+ counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId)
+ .label(port).label(aliasId).label(counterName);
+ }
+ if (tableId != null) {
+ Labeled<Labeled<Labeled<Labeled<Counter>>>> labeledCounter =
+ metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
+ .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
+ CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
+ CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
+ counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId)
+ .label(tableId).label(counterName);
+ }
- @Override
- public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate flowTableStats) {
- String node = flowTableStats.getId().getValue().split(":")[1];
- Map<String, String> entriesPerOFTableMap = new HashMap<>();
- List<FlowTableAndStatisticsMap> flowTableAndStatisticsMapList = flowTableStats
- .getFlowTableAndStatisticsMap();
- for (FlowTableAndStatisticsMap flowTableAndStatisticsMap : flowTableAndStatisticsMapList) {
- String nodeTableStr = "dpnId_" + node + "_table_"
- + flowTableAndStatisticsMap.getTableId().getValue().toString();
- entriesPerOFTableMap.put("EntriesPerOFTable:" + nodeTableStr + "_EntriesPerOFTable",
- flowTableAndStatisticsMap.getActiveFlows().getValue().toString());
- }
- nodeAndEntriesPerOFTableMap.put(node, entriesPerOFTableMap);
- entriesPerOFTableMap = combineAllNodesStats(nodeAndEntriesPerOFTableMap);
- PMAGENT.connectToPMAgentAndInvokeEntriesPerOFTable(entriesPerOFTableMap);
+ // create counters set for node if absent.
+ // and then populate counter set with counter object
+ // which will be needed to close counters when node is removed.
+ metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter);
+
+ return counter;
+ }
+
+ /**
+ * This method updates counter values.
+ */
+ private void updateCounter(Counter counter, long counterValue) {
+ try {
+ // reset counter to zero
+ counter.decrement(counter.get());
+ // set counter to specified value
+ counter.increment(counterValue);
+ } catch (IllegalStateException e) {
+ LOG.error("Metric counter ({}) update has got exception: ", counter, e);
}
}
@Override
protected void remove(InstanceIdentifier<Node> identifier, Node node) {
NodeId nodeId = node.getId();
- String nodeVal = nodeId.getValue().split(":")[1];
- BigInteger dpId = new BigInteger(nodeVal);
+ String dpId = nodeId.getValue().split(":")[1];
if (nodes.contains(dpId)) {
nodes.remove(dpId);
- nodeAndNcIdOFPortDurationMap.remove(nodeVal);
- nodeAndNcIdOFPortReceiveDropMap.remove(nodeVal);
- nodeAndNcIdOFPortReceiveError.remove(nodeVal);
- nodeAndNcIdPacketSentMap.remove(nodeVal);
- nodeAndNcIdPacketReceiveMap.remove(nodeVal);
- nodeAndNcIdBytesSentMap.remove(nodeVal);
- nodeAndNcIdBytesReceiveMap.remove(nodeVal);
- nodeAndEntriesPerOFTableMap.remove(nodeVal);
+ // remove counters set from node
+ Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
+ if (nodeMetricCounterSet != null) {
+ // remove counters
+ nodeMetricCounterSet.forEach(UncheckedCloseable::close);
+ }
}
- if (nodes.isEmpty()) {
+ if (nodes.size() > 0) {
+ delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
+ } else {
stopPortStatRequestTask();
+ delayStatsQuery = 0;
}
}
@Override
protected void add(InstanceIdentifier<Node> identifier, Node node) {
NodeId nodeId = node.getId();
- BigInteger dpId = new BigInteger(nodeId.getValue().split(":")[1]);
- if (nodes.contains(dpId)) {
- return;
- }
- nodes.add(dpId);
- if (nodes.size() == 1) {
- schedulePortStatRequestTask();
+ if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
+ LOG.trace("Locally connected switch {}",nodeId.getValue());
+ String dpId = nodeId.getValue().split(":")[1];
+ if (nodes.contains(dpId)) {
+ return;
+ }
+ nodes.add(dpId);
+ delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
+ if (nodes.size() == 1) {
+ schedulePortStatRequestTask();
+ }
+ } else {
+ LOG.trace("Not a locally connected switch {}",nodeId.getValue());
}
}
}