Handle nullable lists
[genius.git] / interfacemanager / interfacemanager-impl / src / main / java / org / opendaylight / genius / interfacemanager / pmcounters / NodeConnectorStatsImpl.java
index 9da255b6256b05bf0cc5132806a0adc2be1e5834..3627b6a3bda20add7392705bbfaa7c460f8c5187 100644 (file)
@@ -7,83 +7,97 @@
  */
 package org.opendaylight.genius.interfacemanager.pmcounters;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.math.BigInteger;
-import java.util.ArrayList;
+import static org.opendaylight.genius.interfacemanager.IfmUtil.nullToEmpty;
+import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
+import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
+import org.opendaylight.genius.interfacemanager.IfmConstants;
+import org.opendaylight.genius.interfacemanager.listeners.InterfaceChildCache;
+import org.opendaylight.genius.interfacemanager.listeners.PortNameCache;
+import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
+import org.opendaylight.infrautils.metrics.Counter;
+import org.opendaylight.infrautils.metrics.Labeled;
+import org.opendaylight.infrautils.metrics.MetricDescriptor;
+import org.opendaylight.infrautils.metrics.MetricProvider;
+import org.opendaylight.infrautils.utils.UncheckedCloseable;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.config.rev160406.IfmConfig;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.meta.rev160406._interface.child.info._interface.parent.entry.InterfaceChildEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Singleton
-public class NodeConnectorStatsImpl extends AsyncDataTreeChangeListenerBase<Node, NodeConnectorStatsImpl> {
+public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListenerBase<Node, NodeConnectorStatsImpl> {
+
     private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
-    private static final String STATS_POLL_FLAG = "interfacemgr.pmcounters.poll";
+
     private static final int THREAD_POOL_SIZE = 4;
-    private static final int NO_DELAY = 0;
-    public static final PMAgentForNodeConnectorCounters PMAGENT = new PMAgentForNodeConnectorCounters();
-    private final PortRpcStatisticsListener portStatsListener = new PortRpcStatisticsListener();
-    private final FlowRpcStatisticsListener flowTableStatsListener = new FlowRpcStatisticsListener();
-    private final List<BigInteger> nodes = new ArrayList<>();
-    Map<String, Map<String, String>> nodeAndNcIdOFPortDurationMap = new ConcurrentHashMap<>();
-    Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveDropMap = new ConcurrentHashMap<>();
-    Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveError = new ConcurrentHashMap<>();
-    Map<String, Map<String, String>> nodeAndNcIdPacketSentMap = new ConcurrentHashMap<>();
-    Map<String, Map<String, String>> nodeAndNcIdPacketReceiveMap = new ConcurrentHashMap<>();
-    Map<String, Map<String, String>> nodeAndNcIdBytesSentMap = new ConcurrentHashMap<>();
-    Map<String, Map<String, String>> nodeAndNcIdBytesReceiveMap = new ConcurrentHashMap<>();
-    Map<String, Map<String, String>> nodeAndEntriesPerOFTableMap = new ConcurrentHashMap<>();
-    private ScheduledFuture<?> scheduledResult;
-    private final OpendaylightPortStatisticsService statPortService;
+    private final Set<String> nodes = ConcurrentHashMap.newKeySet();
+    private final Map<String, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
+    private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
     private final ScheduledExecutorService portStatExecutorService;
-    private final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService;
+    private final EntityOwnershipUtils entityOwnershipUtils;
+    private final PortNameCache portNameCache;
+    private final InterfaceChildCache interfaceChildCache;
+    private final IfmConfig ifmConfig;
+    private final MetricProvider metricProvider;
+
+    private volatile int delayStatsQuery;
+    private ScheduledFuture<?> scheduledResult;
 
     @Inject
-    public NodeConnectorStatsImpl(DataBroker dataBroker, NotificationService notificationService,
-                                  final OpendaylightPortStatisticsService opendaylightPortStatisticsService,
-                                  final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService) {
+    public NodeConnectorStatsImpl(DataBroker dataBroker,
+                                  final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService,
+                                  final EntityOwnershipUtils entityOwnershipUtils,
+                                  final PortNameCache portNameCache,
+                                  final InterfaceChildCache interfaceChildCache,
+                                  final IfmConfig ifmConfigObj,
+                                  final MetricProvider metricProvider) {
         super(Node.class, NodeConnectorStatsImpl.class);
-        this.statPortService = opendaylightPortStatisticsService;
-        this.opendaylightFlowTableStatisticsService = opendaylightFlowTableStatisticsService;
+        this.opendaylightDirectStatisticsService = opendaylightDirectStatisticsService;
+        this.entityOwnershipUtils = entityOwnershipUtils;
+        this.portNameCache = portNameCache;
+        this.interfaceChildCache = interfaceChildCache;
+        this.ifmConfig = ifmConfigObj;
+        this.metricProvider = metricProvider;
         registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
-        portStatExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
-            getThreadFactory("Port Stats " + "Request Task"));
-        notificationService.registerNotificationListener(portStatsListener);
-        notificationService.registerNotificationListener(flowTableStatsListener);
-        PMAGENT.registerMbean();
+        portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG);
     }
 
     @Override
@@ -96,18 +110,25 @@ public class NodeConnectorStatsImpl extends AsyncDataTreeChangeListenerBase<Node
         return NodeConnectorStatsImpl.this;
     }
 
+    @Override
+    @PreDestroy
+    public void close() {
+        // close the nested counter objects for each node
+        metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
+    }
+
     /*
      * PortStat request task is started when first DPN gets connected
      */
     private void schedulePortStatRequestTask() {
-        if (!Boolean.getBoolean(STATS_POLL_FLAG)) {
+        if (!ifmConfig.isIfmStatsPollEnabled()) {
             LOG.info("Port statistics is turned off");
             return;
         }
         LOG.info("Scheduling port statistics request");
         PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
-        scheduledResult = portStatExecutorService.scheduleAtFixedRate(portStatRequestTask, NO_DELAY, 10000,
-                TimeUnit.MILLISECONDS);
+        scheduledResult = portStatExecutorService.scheduleWithFixedDelay(portStatRequestTask,
+                ifmConfig.getIfmStatsDefPollInterval(), ifmConfig.getIfmStatsDefPollInterval(), TimeUnit.MINUTES);
     }
 
     /*
@@ -132,163 +153,276 @@ public class NodeConnectorStatsImpl extends AsyncDataTreeChangeListenerBase<Node
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Requesting port stats - {}");
             }
-            for (BigInteger node : nodes) {
-                LOG.trace("Requesting AllNodeConnectorStatistics for node - {}", node);
-                statPortService.getAllNodeConnectorsStatistics(buildGetAllNodeConnectorStatistics(node));
-                opendaylightFlowTableStatisticsService.getFlowTablesStatistics(buildGetFlowTablesStatistics(node));
+            for (String node : nodes) {
+                LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
+                // Call RPC to Get NodeConnector Stats for node
+                ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture =
+                        opendaylightDirectStatisticsService.getNodeConnectorStatistics(
+                            buildGetNodeConnectorStatisticsInput(node));
+
+                Futures.addCallback(ncStatsFuture, new FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
+
+                    @Override
+                    public void onFailure(@Nonnull Throwable error) {
+                        LOG.error("getNodeConnectorStatistics RPC failed for node: {} ", node, error);
+                    }
+
+                    @Override
+                    public void onSuccess(RpcResult<GetNodeConnectorStatisticsOutput> result) {
+                        if (result != null) {
+                            if (result.isSuccessful()) {
+                                GetNodeConnectorStatisticsOutput ncStatsRpcResult = result.getResult();
+                                // process NodeConnectorStatistics RPC result
+                                processNodeConnectorStatistics(ncStatsRpcResult, node);
+                            } else {
+                                LOG.error("getNodeConnectorStatistics RPC failed for node: {} with error: {}",
+                                        node, result.getErrors());
+                            }
+                        }
+                    }
+                }, MoreExecutors.directExecutor());
+
+                // Call RPC to Get flow stats for node
+                ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture =
+                        opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node));
+
+                Futures.addCallback(flowStatsFuture, new FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
+
+                    @Override
+                    public void onFailure(@Nonnull Throwable error) {
+                        LOG.error("getFlowStatistics RPC failed for node: {} ", node, error);
+                    }
+
+                    @Override
+                    public void onSuccess(RpcResult<GetFlowStatisticsOutput> result) {
+                        if (result != null) {
+                            if (result.isSuccessful()) {
+                                GetFlowStatisticsOutput flowStatsRpcResult = result.getResult();
+                                // process FlowStatistics RPC result
+                                processFlowStatistics(flowStatsRpcResult, node);
+                            } else {
+                                LOG.error("getFlowStatistics RPC failed for node: {} with error: {}",
+                                        node, result.getErrors());
+                            }
+                        }
+                    }
+                }, MoreExecutors.directExecutor());
+
+                delay();
             }
         }
 
-        private GetAllNodeConnectorsStatisticsInput buildGetAllNodeConnectorStatistics(BigInteger dpId) {
-            return new GetAllNodeConnectorsStatisticsInputBuilder()
+        /**
+         * The delay is added to spread the RPC call of the switches to query statistics
+         * across the polling interval.
+         * delay factor is calculated by dividing pollinginterval by no.of.switches.
+         */
+        private void delay() {
+            try {
+                Thread.sleep(TimeUnit.SECONDS.toMillis(delayStatsQuery));
+            } catch (InterruptedException ex) {
+                LOG.error("InterruptedException");
+            }
+        }
+
+        /**
+         * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
+         */
+        private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) {
+            return new GetNodeConnectorStatisticsInputBuilder()
                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
-                            .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
+                            .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
                     .build();
         }
 
-        private GetFlowTablesStatisticsInput buildGetFlowTablesStatistics(BigInteger dpId) {
-            return new GetFlowTablesStatisticsInputBuilder()
+        /**
+         * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
+         */
+        private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) {
+            return new GetFlowStatisticsInputBuilder()
                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
-                            .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
+                            .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
                     .build();
         }
     }
 
-    private ThreadFactory getThreadFactory(String threadNameFormat) {
-        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-        builder.setNameFormat(threadNameFormat);
-        builder.setUncaughtExceptionHandler((thread, exception) -> LOG
-                .error("Received Uncaught Exception event in Thread: {}", thread.getName(), exception));
-        return builder.build();
-    }
-
-    /*
-     * PortRpcStatisticsListener listens for the NodeConnectorStatisticsUpdate
-     * and then update the corresponding counter map
+    /**
+     * This method processes NodeConnectorStatistics RPC result.
+     * It performs:
+     * - fetches various OF Port counters values
+     * - creates/updates new OF Port counters using Infrautils metrics API
+     * - set counter with values fetched from NodeConnectorStatistics
      */
-    class PortRpcStatisticsListener implements OpendaylightPortStatisticsListener {
-
-        @Override
-        public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate ncStats) {
-            Map<String, String> ncIdOFPortDurationMap = new HashMap<>();
-            Map<String, String> ncIdOFPortReceiveDropMap = new HashMap<>();
-            Map<String, String> ncIdOFPortReceiveError = new HashMap<>();
-            Map<String, String> ncIdPacketSentMap = new HashMap<>();
-            Map<String, String> ncIdPacketReceiveMap = new HashMap<>();
-            Map<String, String> ncIdBytesSentMap = new HashMap<>();
-            Map<String, String> ncIdBytesReceiveMap = new HashMap<>();
-            List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = ncStats
-                    .getNodeConnectorStatisticsAndPortNumberMap();
-            NodeId nodeId = ncStats.getId();
-            String node = nodeId.getValue().split(":")[1];
-            for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
-                NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
-                String port = nodeConnector.getValue().split(":")[2];
-                String nodePortStr = "dpnId_" + node + "_portNum_" + port;
-                ncIdOFPortDurationMap.put("OFPortDuration:" + nodePortStr + "_OFPortDuration",
-                        ncStatsAndPortMap.getDuration().getSecond().getValue().toString());
-                ncIdOFPortReceiveDropMap.put(
-                        "PacketsPerOFPortReceiveDrop:" + nodePortStr + "_PacketsPerOFPortReceiveDrop",
-                        ncStatsAndPortMap.getReceiveDrops().toString());
-                ncIdOFPortReceiveError.put(
-                        "PacketsPerOFPortReceiveError:" + nodePortStr + "_PacketsPerOFPortReceiveError",
-                        ncStatsAndPortMap.getReceiveErrors().toString());
-                ncIdPacketSentMap.put("PacketsPerOFPortSent:" + nodePortStr + "_PacketsPerOFPortSent",
-                        ncStatsAndPortMap.getPackets().getTransmitted().toString());
-                ncIdPacketReceiveMap.put("PacketsPerOFPortReceive:" + nodePortStr + "_PacketsPerOFPortReceive",
-                        ncStatsAndPortMap.getPackets().getReceived().toString());
-                ncIdBytesSentMap.put("BytesPerOFPortSent:" + nodePortStr + "_BytesPerOFPortSent",
-                        ncStatsAndPortMap.getBytes().getTransmitted().toString());
-                ncIdBytesReceiveMap.put("BytesPerOFPortReceive:" + nodePortStr + "_BytesPerOFPortReceive",
-                        ncStatsAndPortMap.getBytes().getReceived().toString());
+    private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
+                                                String dpid) {
+        String port = "";
+        String portUuid = "";
+        List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = nodeConnectorStatisticsOutput
+                        .getNodeConnectorStatisticsAndPortNumberMap();
+        // Parse NodeConnectorStatistics and create/update counters for them
+        for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : nullToEmpty(ncStatsAndPortMapList)) {
+            NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
+            LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid);
+            port = nodeConnector.getValue();
+            // update port name as per port name maintained in portNameCache
+            String portNameInCache = "openflow" + ":" + dpid + ":" + port;
+            java.util.Optional<String> portName = portNameCache.get(portNameInCache);
+            if (portName.isPresent()) {
+                Optional<List<InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
+                        .getInterfaceChildEntries(portName.get());
+                if (interfaceChildEntries.isPresent()) {
+                    if (!interfaceChildEntries.get().isEmpty()) {
+                        portUuid = interfaceChildEntries.get().get(0).getChildInterface();
+                        LOG.trace("Retrieved portUuid {} for portname {}", portUuid, portName.get());
+                    } else {
+                        LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
+                            portName.get());
+                        continue;
+                    }
+                } else {
+                    LOG.trace("PortUuid is not found for portname {}. Skipping IFM counters publish for this port.",
+                        portName.get());
+                    continue;
+                }
             }
-            LOG.trace("Port Stats {}", ncStatsAndPortMapList);
-            // Storing allNodeConnectorStats(like ncIdOFPortDurationMap) in a
-            // map with key as node for easy removal and addition of
-            // allNodeConnectorStats.
-            nodeAndNcIdOFPortDurationMap.put(node, ncIdOFPortDurationMap);
-            nodeAndNcIdOFPortReceiveDropMap.put(node, ncIdOFPortReceiveDropMap);
-            nodeAndNcIdOFPortReceiveError.put(node, ncIdOFPortReceiveError);
-            nodeAndNcIdPacketSentMap.put(node, ncIdPacketSentMap);
-            nodeAndNcIdPacketReceiveMap.put(node, ncIdPacketReceiveMap);
-            nodeAndNcIdBytesSentMap.put(node, ncIdBytesSentMap);
-            nodeAndNcIdBytesReceiveMap.put(node, ncIdBytesReceiveMap);
-            // Combining the stats of all nodeconnectors in all nodes. This Map
-            // will be stored under MBean which will be queried as regular
-            // intervals.
-            ncIdOFPortDurationMap = combineAllNodesStats(nodeAndNcIdOFPortDurationMap);
-            ncIdOFPortReceiveDropMap = combineAllNodesStats(nodeAndNcIdOFPortReceiveDropMap);
-            ncIdOFPortReceiveError = combineAllNodesStats(nodeAndNcIdOFPortReceiveError);
-            ncIdPacketSentMap = combineAllNodesStats(nodeAndNcIdPacketSentMap);
-            ncIdPacketReceiveMap = combineAllNodesStats(nodeAndNcIdPacketReceiveMap);
-            ncIdBytesSentMap = combineAllNodesStats(nodeAndNcIdBytesSentMap);
-            ncIdBytesReceiveMap = combineAllNodesStats(nodeAndNcIdBytesReceiveMap);
-            PMAGENT.connectToPMAgent(ncIdOFPortDurationMap, ncIdOFPortReceiveDropMap, ncIdOFPortReceiveError,
-                    ncIdPacketSentMap, ncIdPacketReceiveMap, ncIdBytesSentMap, ncIdBytesReceiveMap);
-        }
 
-        /*
-         * Input allNodesStats contains statistics of all nodeConnectors of all
-         * nodes. Key is the node and values contains another map with key as
-         * node connector and value as statresult. Output will be a map with key
-         * as nodeconnector and value as the statresult. The key contains
-         * nodeconnectors of all the nodes.
-         */
+            Counter counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_DURATION, dpid, port, portUuid,null);
+            long ofPortDuration = ncStatsAndPortMap.getDuration().getSecond().getValue();
+            updateCounter(counter, ofPortDuration);
+
+            counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVDROP, dpid, port, portUuid, null);
+            long packetsPerOFPortReceiveDrop = ncStatsAndPortMap.getReceiveDrops().longValue();
+            updateCounter(counter, packetsPerOFPortReceiveDrop);
+
+            counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECVERROR, dpid, port, portUuid, null);
+            long packetsPerOFPortReceiveError = ncStatsAndPortMap.getReceiveErrors().longValue();
+            updateCounter(counter, packetsPerOFPortReceiveError);
+
+            counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_SENT, dpid, port, portUuid, null);
+            long packetsPerOFPortSent = ncStatsAndPortMap.getPackets().getTransmitted().longValue();
+            updateCounter(counter, packetsPerOFPortSent);
+
+            counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_PKT_RECV, dpid, port, portUuid, null);
+            long packetsPerOFPortReceive = ncStatsAndPortMap.getPackets().getReceived().longValue();
+            updateCounter(counter, packetsPerOFPortReceive);
+
+            counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_SENT, dpid, port, portUuid, null);
+            long bytesPerOFPortSent = ncStatsAndPortMap.getBytes().getTransmitted().longValue();
+            updateCounter(counter, bytesPerOFPortSent);
+
+            counter = getCounter(CounterConstants.IFM_PORT_COUNTER_OFPORT_BYTE_RECV, dpid, port, portUuid, null);
+            long bytesPerOFPortReceive = ncStatsAndPortMap.getBytes().getReceived().longValue();
+            updateCounter(counter, bytesPerOFPortReceive);
+        }
     }
 
-    private Map<String, String> combineAllNodesStats(Map<String, Map<String, String>> allNodesStats) {
-        Map<String, String> allNcsStatsMap = new HashMap<>();
-        for (Map.Entry<String, Map<String, String>> entry : allNodesStats.entrySet()) {
-            Map<String, String> ncStatsMap = entry.getValue();
-            for (Map.Entry<String, String> statResult : ncStatsMap.entrySet()) {
-                allNcsStatsMap.put(statResult.getKey(), statResult.getValue());
-            }
+    /**
+     * This method processes FlowStatistics RPC result.
+     * It performs:
+     * - fetches all flows of node
+     * - stores flows count per table in local map
+     * - creates/updates Flow table counters using Infrautils metrics API
+     * - set counter with values fetched from FlowStatistics
+     */
+    private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) {
+        Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
+        // Get all flows for node from RPC result
+        List<FlowAndStatisticsMapList> flowTableAndStatisticsMapList = flowStatsOutput.getFlowAndStatisticsMapList();
+        for (FlowAndStatisticsMapList flowAndStatisticsMap : nullToEmpty(flowTableAndStatisticsMapList)) {
+            short tableId = flowAndStatisticsMap.getTableId();
+            // populate map to maintain flow count per table
+            flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
+        }
+        LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid);
+        for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
+            Short tableId = flowTable.getKey();
+            AtomicInteger flowCount = flowTable.getValue();
+            Counter counter = getCounter(CounterConstants.IFM_FLOW_TBL_COUNTER_FLOWS_PER_TBL, dpid, null, null,
+                    tableId.toString());
+            // update counter value
+            updateCounter(counter, flowCount.longValue());
         }
-        return allNcsStatsMap;
     }
 
     /*
-     * FlowRpcStatisticsListener listens for the FlowTableStatisticsUpdate and
-     * then update the corresponding counter map
+     * This method returns counter and also creates counter if does not exist.
+     *
+     * @param counterName name of the counter
+     * @param switchId datapath-id value
+     * @param port port-id value
+     * @param aliasId alias-id value
+     * @param tableId table-id value of switch
+     * @return counter object
      */
-    class FlowRpcStatisticsListener implements OpendaylightFlowTableStatisticsListener {
+    private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) {
+        /*
+         * Pattern to be followed for key generation:
+         *
+         * genius.interfacemanager.entitycounter{entitytype=port,switchid=value,portid=value,aliasid=value,
+         * name=counterName}
+         * genius.interfacemanager.entitycounter{entitytype=flowtable,switchid=value,flowtableid=value,name=counterName}
+         */
+        Counter counter = null;
+        if (port != null) {
+            Labeled<Labeled<Labeled<Labeled<Labeled<Counter>>>>> labeledCounter =
+                    metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
+                        .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
+                        CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
+                        CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
+                        CounterConstants.LBL_KEY_COUNTER_NAME);
+            counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId)
+                    .label(port).label(aliasId).label(counterName);
+        }
+        if (tableId != null) {
+            Labeled<Labeled<Labeled<Labeled<Counter>>>> labeledCounter =
+                    metricProvider.newCounter(MetricDescriptor.builder().anchor(this).project("genius")
+                        .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
+                        CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
+                        CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
+            counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId)
+                    .label(tableId).label(counterName);
+        }
 
-        @Override
-        public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate flowTableStats) {
-            String node = flowTableStats.getId().getValue().split(":")[1];
-            Map<String, String> entriesPerOFTableMap = new HashMap<>();
-            List<FlowTableAndStatisticsMap> flowTableAndStatisticsMapList = flowTableStats
-                    .getFlowTableAndStatisticsMap();
-            for (FlowTableAndStatisticsMap flowTableAndStatisticsMap : flowTableAndStatisticsMapList) {
-                String nodeTableStr = "dpnId_" + node + "_table_"
-                        + flowTableAndStatisticsMap.getTableId().getValue().toString();
-                entriesPerOFTableMap.put("EntriesPerOFTable:" + nodeTableStr + "_EntriesPerOFTable",
-                        flowTableAndStatisticsMap.getActiveFlows().getValue().toString());
-            }
-            nodeAndEntriesPerOFTableMap.put(node, entriesPerOFTableMap);
-            entriesPerOFTableMap = combineAllNodesStats(nodeAndEntriesPerOFTableMap);
-            PMAGENT.connectToPMAgentAndInvokeEntriesPerOFTable(entriesPerOFTableMap);
+        // create counters set for node if absent.
+        // and then populate counter set with counter object
+        // which will be needed to close counters when node is removed.
+        metricsCountersPerNodeMap.computeIfAbsent(switchId, counterSet -> ConcurrentHashMap.newKeySet()).add(counter);
+
+        return counter;
+    }
+
+    /**
+     * This method updates counter values.
+     */
+    private void updateCounter(Counter counter, long counterValue) {
+        try {
+            // reset counter to zero
+            counter.decrement(counter.get());
+            // set counter to specified value
+            counter.increment(counterValue);
+        } catch (IllegalStateException e) {
+            LOG.error("Metric counter ({}) update has got exception: ", counter, e);
         }
     }
 
     @Override
     protected void remove(InstanceIdentifier<Node> identifier, Node node) {
         NodeId nodeId = node.getId();
-        String nodeVal = nodeId.getValue().split(":")[1];
-        BigInteger dpId = new BigInteger(nodeVal);
+        String dpId = nodeId.getValue().split(":")[1];
         if (nodes.contains(dpId)) {
             nodes.remove(dpId);
-            nodeAndNcIdOFPortDurationMap.remove(nodeVal);
-            nodeAndNcIdOFPortReceiveDropMap.remove(nodeVal);
-            nodeAndNcIdOFPortReceiveError.remove(nodeVal);
-            nodeAndNcIdPacketSentMap.remove(nodeVal);
-            nodeAndNcIdPacketReceiveMap.remove(nodeVal);
-            nodeAndNcIdBytesSentMap.remove(nodeVal);
-            nodeAndNcIdBytesReceiveMap.remove(nodeVal);
-            nodeAndEntriesPerOFTableMap.remove(nodeVal);
+            // remove counters set from node
+            Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
+            if (nodeMetricCounterSet != null) {
+                // remove counters
+                nodeMetricCounterSet.forEach(UncheckedCloseable::close);
+            }
         }
-        if (nodes.isEmpty()) {
+        if (nodes.size() > 0) {
+            delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
+        } else {
             stopPortStatRequestTask();
+            delayStatsQuery = 0;
         }
     }
 
@@ -300,13 +434,19 @@ public class NodeConnectorStatsImpl extends AsyncDataTreeChangeListenerBase<Node
     @Override
     protected void add(InstanceIdentifier<Node> identifier, Node node) {
         NodeId nodeId = node.getId();
-        BigInteger dpId = new BigInteger(nodeId.getValue().split(":")[1]);
-        if (nodes.contains(dpId)) {
-            return;
-        }
-        nodes.add(dpId);
-        if (nodes.size() == 1) {
-            schedulePortStatRequestTask();
+        if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
+            LOG.trace("Locally connected switch {}",nodeId.getValue());
+            String dpId = nodeId.getValue().split(":")[1];
+            if (nodes.contains(dpId)) {
+                return;
+            }
+            nodes.add(dpId);
+            delayStatsQuery = ifmConfig.getIfmStatsDefPollInterval() / nodes.size();
+            if (nodes.size() == 1) {
+                schedulePortStatRequestTask();
+            }
+        } else {
+            LOG.trace("Not a locally connected switch {}",nodeId.getValue());
         }
     }
 }