HwVTEP support for interfacemanager
[vpnservice.git] / interfacemgr / interfacemgr-impl / src / main / java / org / opendaylight / vpnservice / interfacemgr / pmcounters / NodeConnectorStatsImpl.java
diff --git a/interfacemgr/interfacemgr-impl/src/main/java/org/opendaylight/vpnservice/interfacemgr/pmcounters/NodeConnectorStatsImpl.java b/interfacemgr/interfacemgr-impl/src/main/java/org/opendaylight/vpnservice/interfacemgr/pmcounters/NodeConnectorStatsImpl.java
new file mode 100644 (file)
index 0000000..226af18
--- /dev/null
@@ -0,0 +1,289 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.vpnservice.interfacemgr.pmcounters;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.vpnservice.mdsalutil.AbstractDataChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class NodeConnectorStatsImpl extends AbstractDataChangeListener<Node>{
+
+    private static final Logger logger = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
+    private static final int THREAD_POOL_SIZE = 4;
+    private static final int NO_DELAY = 0;
+    public static final PMAgentForNodeConnectorCounters pmagent = new PMAgentForNodeConnectorCounters();
+    private PortRpcStatisticsListener portStatsListener = new PortRpcStatisticsListener();
+    private FlowRpcStatisticsListener flowTableStatsListener = new FlowRpcStatisticsListener();
+    private List<BigInteger> nodes = new ArrayList<>();
+    Map<String, Map<String, String>> nodeAndNcIdOFPortDurationMap = new ConcurrentHashMap<String, Map<String, String>>();
+    Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveDropMap = new ConcurrentHashMap<String, Map<String, String>>();
+    Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveError = new ConcurrentHashMap<String, Map<String, String>>();
+    Map<String, Map<String, String>> nodeAndNcIdPacketSentMap = new ConcurrentHashMap<String, Map<String, String>>();
+    Map<String, Map<String, String>> nodeAndNcIdPacketReceiveMap = new ConcurrentHashMap<String, Map<String, String>>();
+    Map<String, Map<String, String>> nodeAndNcIdBytesSentMap = new ConcurrentHashMap<String, Map<String, String>>();
+    Map<String, Map<String, String>> nodeAndNcIdBytesReceiveMap = new ConcurrentHashMap<String, Map<String, String>>();
+    Map<String, Map<String, String>> nodeAndEntriesPerOFTableMap = new ConcurrentHashMap<String, Map<String, String>>();
+    private ScheduledFuture<?> scheduledResult;
+    private OpendaylightPortStatisticsService statPortService;
+    private ScheduledExecutorService portStatExecutorService;
+    private OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService;
+    public NodeConnectorStatsImpl(DataBroker db, NotificationService notificationService, OpendaylightPortStatisticsService statPortService, OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService) {
+        super(Node.class);
+        this.statPortService = statPortService;
+        this.opendaylightFlowTableStatisticsService = opendaylightFlowTableStatisticsService;
+        registerListener(db);
+        portStatExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE, getThreadFactory("Port Stats Request Task"));
+        notificationService.registerNotificationListener(portStatsListener);
+        notificationService.registerNotificationListener(flowTableStatsListener);
+        pmagent.registerMbean();
+    }
+
+    private void registerListener(final DataBroker db) {
+        try {
+            db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+                    getWildCardPath(), NodeConnectorStatsImpl.this, AsyncDataBroker.DataChangeScope.SUBTREE);
+        } catch (final Exception e) {
+            logger.error("NodeConnectorStatsImpl: DataChange listener registration fail!", e);
+            throw new IllegalStateException("NodeConnectorStatsImpl: registration Listener failed.", e);
+        }
+    }
+
+    private InstanceIdentifier<Node> getWildCardPath() {
+        return InstanceIdentifier.create(Nodes.class).child(Node.class);
+    }
+
+    /*
+     * PortStat request task is started when first DPN gets connected
+     */
+    private void schedulePortStatRequestTask() {
+        logger.info("Scheduling port statistics request");
+        PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
+        scheduledResult = portStatExecutorService.scheduleAtFixedRate(portStatRequestTask, NO_DELAY, 10000, TimeUnit.MILLISECONDS);
+    }
+
+    /*
+     * PortStat request task is stopped when last DPN is removed.
+     */
+    private void stopPortStatRequestTask() {
+        if(scheduledResult != null) {
+            logger.info("Stopping port statistics request");
+            scheduledResult.cancel(true);
+        }
+    }
+
+    /*
+     * This task queries for node connector statistics as well as flowtables statistics every 10 secs.
+     * Minimum period which can be configured for PMJob is 10 secs.
+     */
+    private class PortStatRequestTask implements Runnable {
+
+        @Override
+        public void run() {
+            if(logger.isTraceEnabled()) {
+                logger.trace("Requesting port stats - {}");
+            }
+            for (BigInteger node : nodes) {
+                logger.trace("Requesting AllNodeConnectorStatistics for node - {}", node);
+                statPortService.getAllNodeConnectorsStatistics(buildGetAllNodeConnectorStatistics(node));
+                opendaylightFlowTableStatisticsService.getFlowTablesStatistics(buildGetFlowTablesStatistics(node));
+            }
+        }
+
+        private GetAllNodeConnectorsStatisticsInput buildGetAllNodeConnectorStatistics(BigInteger dpId) {
+            return new GetAllNodeConnectorsStatisticsInputBuilder()
+                    .setNode(
+                            new NodeRef(InstanceIdentifier.builder(Nodes.class)
+                                    .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build())).build();
+        }
+
+        private GetFlowTablesStatisticsInput buildGetFlowTablesStatistics(BigInteger dpId) {
+            return new GetFlowTablesStatisticsInputBuilder()
+                    .setNode(
+                            new NodeRef(InstanceIdentifier.builder(Nodes.class)
+                                    .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build())).build();
+        }
+
+    }
+
+    private ThreadFactory getThreadFactory(String threadNameFormat) {
+        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+        builder.setNameFormat(threadNameFormat);
+        builder.setUncaughtExceptionHandler( new UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                logger.error("Received Uncaught Exception event in Thread: {}", t.getName(), e);
+            }
+        });
+        return builder.build();
+    }
+
+    /*
+     * PortRpcStatisticsListener listens for the NodeConnectorStatisticsUpdate and then update the corresponding counter map
+     */
+    class PortRpcStatisticsListener implements OpendaylightPortStatisticsListener {
+
+        @Override
+        public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate ncStats) {
+            Map<String, String> ncIdOFPortDurationMap = new HashMap<String, String>();
+            Map<String, String> ncIdOFPortReceiveDropMap = new HashMap<String, String>();
+            Map<String, String> ncIdOFPortReceiveError = new HashMap<String, String>();
+            Map<String, String> ncIdPacketSentMap = new HashMap<String, String>();
+            Map<String, String> ncIdPacketReceiveMap = new HashMap<String, String>();
+            Map<String, String> ncIdBytesSentMap = new HashMap<String, String>();
+            Map<String, String> ncIdBytesReceiveMap = new HashMap<String, String>();
+            List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = ncStats.getNodeConnectorStatisticsAndPortNumberMap();
+            NodeId nodeId = ncStats.getId();
+            String node = nodeId.getValue().split(":")[1];
+            for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
+                NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
+                String port = nodeConnector.getValue().split(":")[2];
+                String nodePortStr = "dpnId_" + node + "_portNum_" + port;
+                ncIdOFPortDurationMap.put("OFPortDuration:" + nodePortStr + "_OFPortDuration", ncStatsAndPortMap.getDuration().getSecond().getValue().toString());
+                ncIdOFPortReceiveDropMap.put("PacketsPerOFPortReceiveDrop:" + nodePortStr + "_PacketsPerOFPortReceiveDrop", ncStatsAndPortMap.getReceiveDrops().toString());
+                ncIdOFPortReceiveError.put("PacketsPerOFPortReceiveError:" + nodePortStr + "_PacketsPerOFPortReceiveError", ncStatsAndPortMap.getReceiveErrors().toString());
+                ncIdPacketSentMap.put("PacketsPerOFPortSent:" + nodePortStr + "_PacketsPerOFPortSent", ncStatsAndPortMap.getPackets().getTransmitted().toString());
+                ncIdPacketReceiveMap.put("PacketsPerOFPortReceive:" + nodePortStr + "_PacketsPerOFPortReceive", ncStatsAndPortMap.getPackets().getReceived().toString());
+                ncIdBytesSentMap.put("BytesPerOFPortSent:" + nodePortStr + "_BytesPerOFPortSent", ncStatsAndPortMap.getBytes().getTransmitted().toString());
+                ncIdBytesReceiveMap.put("BytesPerOFPortReceive:" + nodePortStr + "_BytesPerOFPortReceive", ncStatsAndPortMap.getBytes().getReceived().toString());
+            }
+            logger.trace("Port Stats {}", ncStatsAndPortMapList);
+            //Storing allNodeConnectorStats(like ncIdOFPortDurationMap) in a map with key as node for easy removal and addition of allNodeConnectorStats.
+            nodeAndNcIdOFPortDurationMap.put(node, ncIdOFPortDurationMap);
+            nodeAndNcIdOFPortReceiveDropMap.put(node, ncIdOFPortReceiveDropMap);
+            nodeAndNcIdOFPortReceiveError.put(node, ncIdOFPortReceiveError);
+            nodeAndNcIdPacketSentMap.put(node, ncIdPacketSentMap);
+            nodeAndNcIdPacketReceiveMap.put(node, ncIdPacketReceiveMap);
+            nodeAndNcIdBytesSentMap.put(node, ncIdBytesSentMap);
+            nodeAndNcIdBytesReceiveMap.put(node, ncIdBytesReceiveMap);
+            //Combining the stats of all nodeconnectors in all nodes. This Map will be stored under MBean which will be queried as regular intervals.
+            ncIdOFPortDurationMap = combineAllNodesStats(nodeAndNcIdOFPortDurationMap);
+            ncIdOFPortReceiveDropMap = combineAllNodesStats(nodeAndNcIdOFPortReceiveDropMap);
+            ncIdOFPortReceiveError = combineAllNodesStats(nodeAndNcIdOFPortReceiveError);
+            ncIdPacketSentMap = combineAllNodesStats(nodeAndNcIdPacketSentMap);
+            ncIdPacketReceiveMap = combineAllNodesStats(nodeAndNcIdPacketReceiveMap);
+            ncIdBytesSentMap = combineAllNodesStats(nodeAndNcIdBytesSentMap);
+            ncIdBytesReceiveMap = combineAllNodesStats(nodeAndNcIdBytesReceiveMap);
+            pmagent.connectToPMAgent(ncIdOFPortDurationMap, ncIdOFPortReceiveDropMap, ncIdOFPortReceiveError, ncIdPacketSentMap, ncIdPacketReceiveMap, ncIdBytesSentMap, ncIdBytesReceiveMap);
+        }
+
+        /*
+         * Input allNodesStats contains statistics of all nodeConnectors of all nodes. Key is the node and values contains another map with key as node connector and value as statresult.
+         * Output will be a map with key as nodeconnector and value as the statresult. The key contains nodeconnectors of all the nodes.
+         */
+    }
+
+    private Map<String, String> combineAllNodesStats(Map<String, Map<String, String>> allNodesStats) {
+        Map<String, String> allNcsStatsMap = new HashMap<String, String>();
+        for (Map.Entry<String, Map<String, String>> entry : allNodesStats.entrySet()) {
+            Map<String, String> ncStatsMap = entry.getValue();
+            for (Map.Entry<String, String> statResult : ncStatsMap.entrySet()) {
+                allNcsStatsMap.put(statResult.getKey(), statResult.getValue());
+            }
+        }
+        return allNcsStatsMap;
+    }
+
+    /*
+     * FlowRpcStatisticsListener listens for the FlowTableStatisticsUpdate and then update the corresponding counter map
+     */
+    class FlowRpcStatisticsListener implements OpendaylightFlowTableStatisticsListener {
+
+        @Override
+        public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate flowTableStats) {
+            String node = flowTableStats.getId().getValue().split(":")[1];
+            Map<String, String> entriesPerOFTableMap = new HashMap<String, String>();
+            List<FlowTableAndStatisticsMap> flowTableAndStatisticsMapList = flowTableStats.getFlowTableAndStatisticsMap();
+            for (FlowTableAndStatisticsMap flowTableAndStatisticsMap : flowTableAndStatisticsMapList) {
+                String nodeTableStr =  "dpnId_" + node + "_table_" + flowTableAndStatisticsMap.getTableId().getValue().toString();
+                entriesPerOFTableMap.put("EntriesPerOFTable:" + nodeTableStr + "_EntriesPerOFTable", flowTableAndStatisticsMap.getActiveFlows().getValue().toString());
+            }
+            nodeAndEntriesPerOFTableMap.put(node, entriesPerOFTableMap);
+            entriesPerOFTableMap = combineAllNodesStats(nodeAndEntriesPerOFTableMap);
+            pmagent.connectToPMAgentAndInvokeEntriesPerOFTable(entriesPerOFTableMap);
+        }
+
+    }
+
+    @Override
+    protected void remove(InstanceIdentifier<Node> identifier, Node node) {
+        NodeId nodeId = node.getId();
+        String nodeVal = nodeId.getValue().split(":")[1];
+        BigInteger dpId = new BigInteger(nodeVal);
+        if (nodes.contains(dpId)) {
+            nodes.remove(dpId);
+            nodeAndNcIdOFPortDurationMap.remove(nodeVal);
+            nodeAndNcIdOFPortReceiveDropMap.remove(nodeVal);
+            nodeAndNcIdOFPortReceiveError.remove(nodeVal);
+            nodeAndNcIdPacketSentMap.remove(nodeVal);
+            nodeAndNcIdPacketReceiveMap.remove(nodeVal);
+            nodeAndNcIdBytesSentMap.remove(nodeVal);
+            nodeAndNcIdBytesReceiveMap.remove(nodeVal);
+            nodeAndEntriesPerOFTableMap.remove(nodeVal);
+        }
+        if (nodes.isEmpty()) {
+            stopPortStatRequestTask();
+        }
+    }
+
+    @Override
+    protected void update(InstanceIdentifier<Node> identifier, Node original,
+            Node update) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    protected void add(InstanceIdentifier<Node> identifier, Node node) {
+        NodeId nodeId = node.getId();
+        BigInteger dpId = new BigInteger(nodeId.getValue().split(":")[1]);
+        if (nodes.contains(dpId)) {
+            return;
+        }
+        nodes.add(dpId);
+        if (nodes.size() == 1) {
+            schedulePortStatRequestTask();
+        }
+    }
+}