2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.interfacemanager.pmcounters;
10 import com.google.common.util.concurrent.ThreadFactoryBuilder;
11 import java.math.BigInteger;
12 import java.util.ArrayList;
13 import java.util.HashMap;
14 import java.util.List;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.TimeUnit;
22 import javax.inject.Inject;
23 import javax.inject.Singleton;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.genius.mdsalutil.AbstractDataChangeListener;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
52 public class NodeConnectorStatsImpl extends AbstractDataChangeListener<Node> {
53 private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
54 private static final String STATS_POLL_FLAG = "interfacemgr.pmcounters.poll";
55 private static final int THREAD_POOL_SIZE = 4;
56 private static final int NO_DELAY = 0;
57 public static final PMAgentForNodeConnectorCounters PMAGENT = new PMAgentForNodeConnectorCounters();
58 private final PortRpcStatisticsListener portStatsListener = new PortRpcStatisticsListener();
59 private final FlowRpcStatisticsListener flowTableStatsListener = new FlowRpcStatisticsListener();
60 private final List<BigInteger> nodes = new ArrayList<>();
61 Map<String, Map<String, String>> nodeAndNcIdOFPortDurationMap = new ConcurrentHashMap<>();
62 Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveDropMap = new ConcurrentHashMap<>();
63 Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveError = new ConcurrentHashMap<>();
64 Map<String, Map<String, String>> nodeAndNcIdPacketSentMap = new ConcurrentHashMap<>();
65 Map<String, Map<String, String>> nodeAndNcIdPacketReceiveMap = new ConcurrentHashMap<>();
66 Map<String, Map<String, String>> nodeAndNcIdBytesSentMap = new ConcurrentHashMap<>();
67 Map<String, Map<String, String>> nodeAndNcIdBytesReceiveMap = new ConcurrentHashMap<>();
68 Map<String, Map<String, String>> nodeAndEntriesPerOFTableMap = new ConcurrentHashMap<>();
69 private ScheduledFuture<?> scheduledResult;
70 private final OpendaylightPortStatisticsService statPortService;
71 private final ScheduledExecutorService portStatExecutorService;
72 private final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService;
75 public NodeConnectorStatsImpl(DataBroker dataBroker, NotificationService notificationService,
76 final OpendaylightPortStatisticsService opendaylightPortStatisticsService,
77 final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService) {
79 this.statPortService = opendaylightPortStatisticsService;
80 this.opendaylightFlowTableStatisticsService = opendaylightFlowTableStatisticsService;
81 registerListener(dataBroker);
82 portStatExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
83 getThreadFactory("Port Stats Request Task"));
84 notificationService.registerNotificationListener(portStatsListener);
85 notificationService.registerNotificationListener(flowTableStatsListener);
86 PMAGENT.registerMbean();
89 private void registerListener(final DataBroker db) {
90 db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, getWildCardPath(), NodeConnectorStatsImpl.this,
91 AsyncDataBroker.DataChangeScope.SUBTREE);
94 private InstanceIdentifier<Node> getWildCardPath() {
95 return InstanceIdentifier.create(Nodes.class).child(Node.class);
99 * PortStat request task is started when first DPN gets connected
101 private void schedulePortStatRequestTask() {
102 if (!Boolean.getBoolean(STATS_POLL_FLAG)) {
103 LOG.info("Port statistics is turned off");
106 LOG.info("Scheduling port statistics request");
107 PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
108 scheduledResult = portStatExecutorService.scheduleAtFixedRate(portStatRequestTask, NO_DELAY, 10000,
109 TimeUnit.MILLISECONDS);
113 * PortStat request task is stopped when last DPN is removed.
115 private void stopPortStatRequestTask() {
116 if (scheduledResult != null) {
117 LOG.info("Stopping port statistics request");
118 scheduledResult.cancel(true);
123 * This task queries for node connector statistics as well as flowtables
124 * statistics every 10 secs. Minimum period which can be configured for
127 private class PortStatRequestTask implements Runnable {
131 if (LOG.isTraceEnabled()) {
132 LOG.trace("Requesting port stats - {}");
134 for (BigInteger node : nodes) {
135 LOG.trace("Requesting AllNodeConnectorStatistics for node - {}", node);
136 statPortService.getAllNodeConnectorsStatistics(buildGetAllNodeConnectorStatistics(node));
137 opendaylightFlowTableStatisticsService.getFlowTablesStatistics(buildGetFlowTablesStatistics(node));
141 private GetAllNodeConnectorsStatisticsInput buildGetAllNodeConnectorStatistics(BigInteger dpId) {
142 return new GetAllNodeConnectorsStatisticsInputBuilder()
143 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
144 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
148 private GetFlowTablesStatisticsInput buildGetFlowTablesStatistics(BigInteger dpId) {
149 return new GetFlowTablesStatisticsInputBuilder()
150 .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
151 .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
156 private ThreadFactory getThreadFactory(String threadNameFormat) {
157 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
158 builder.setNameFormat(threadNameFormat);
159 builder.setUncaughtExceptionHandler((thread, exception) -> LOG
160 .error("Received Uncaught Exception event in Thread: {}", thread.getName(), exception));
161 return builder.build();
165 * PortRpcStatisticsListener listens for the NodeConnectorStatisticsUpdate
166 * and then update the corresponding counter map
168 class PortRpcStatisticsListener implements OpendaylightPortStatisticsListener {
171 public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate ncStats) {
172 Map<String, String> ncIdOFPortDurationMap = new HashMap<>();
173 Map<String, String> ncIdOFPortReceiveDropMap = new HashMap<>();
174 Map<String, String> ncIdOFPortReceiveError = new HashMap<>();
175 Map<String, String> ncIdPacketSentMap = new HashMap<>();
176 Map<String, String> ncIdPacketReceiveMap = new HashMap<>();
177 Map<String, String> ncIdBytesSentMap = new HashMap<>();
178 Map<String, String> ncIdBytesReceiveMap = new HashMap<>();
179 List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = ncStats
180 .getNodeConnectorStatisticsAndPortNumberMap();
181 NodeId nodeId = ncStats.getId();
182 String node = nodeId.getValue().split(":")[1];
183 for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
184 NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
185 String port = nodeConnector.getValue().split(":")[2];
186 String nodePortStr = "dpnId_" + node + "_portNum_" + port;
187 ncIdOFPortDurationMap.put("OFPortDuration:" + nodePortStr + "_OFPortDuration",
188 ncStatsAndPortMap.getDuration().getSecond().getValue().toString());
189 ncIdOFPortReceiveDropMap.put(
190 "PacketsPerOFPortReceiveDrop:" + nodePortStr + "_PacketsPerOFPortReceiveDrop",
191 ncStatsAndPortMap.getReceiveDrops().toString());
192 ncIdOFPortReceiveError.put(
193 "PacketsPerOFPortReceiveError:" + nodePortStr + "_PacketsPerOFPortReceiveError",
194 ncStatsAndPortMap.getReceiveErrors().toString());
195 ncIdPacketSentMap.put("PacketsPerOFPortSent:" + nodePortStr + "_PacketsPerOFPortSent",
196 ncStatsAndPortMap.getPackets().getTransmitted().toString());
197 ncIdPacketReceiveMap.put("PacketsPerOFPortReceive:" + nodePortStr + "_PacketsPerOFPortReceive",
198 ncStatsAndPortMap.getPackets().getReceived().toString());
199 ncIdBytesSentMap.put("BytesPerOFPortSent:" + nodePortStr + "_BytesPerOFPortSent",
200 ncStatsAndPortMap.getBytes().getTransmitted().toString());
201 ncIdBytesReceiveMap.put("BytesPerOFPortReceive:" + nodePortStr + "_BytesPerOFPortReceive",
202 ncStatsAndPortMap.getBytes().getReceived().toString());
204 LOG.trace("Port Stats {}", ncStatsAndPortMapList);
205 // Storing allNodeConnectorStats(like ncIdOFPortDurationMap) in a
206 // map with key as node for easy removal and addition of
207 // allNodeConnectorStats.
208 nodeAndNcIdOFPortDurationMap.put(node, ncIdOFPortDurationMap);
209 nodeAndNcIdOFPortReceiveDropMap.put(node, ncIdOFPortReceiveDropMap);
210 nodeAndNcIdOFPortReceiveError.put(node, ncIdOFPortReceiveError);
211 nodeAndNcIdPacketSentMap.put(node, ncIdPacketSentMap);
212 nodeAndNcIdPacketReceiveMap.put(node, ncIdPacketReceiveMap);
213 nodeAndNcIdBytesSentMap.put(node, ncIdBytesSentMap);
214 nodeAndNcIdBytesReceiveMap.put(node, ncIdBytesReceiveMap);
215 // Combining the stats of all nodeconnectors in all nodes. This Map
216 // will be stored under MBean which will be queried as regular
218 ncIdOFPortDurationMap = combineAllNodesStats(nodeAndNcIdOFPortDurationMap);
219 ncIdOFPortReceiveDropMap = combineAllNodesStats(nodeAndNcIdOFPortReceiveDropMap);
220 ncIdOFPortReceiveError = combineAllNodesStats(nodeAndNcIdOFPortReceiveError);
221 ncIdPacketSentMap = combineAllNodesStats(nodeAndNcIdPacketSentMap);
222 ncIdPacketReceiveMap = combineAllNodesStats(nodeAndNcIdPacketReceiveMap);
223 ncIdBytesSentMap = combineAllNodesStats(nodeAndNcIdBytesSentMap);
224 ncIdBytesReceiveMap = combineAllNodesStats(nodeAndNcIdBytesReceiveMap);
225 PMAGENT.connectToPMAgent(ncIdOFPortDurationMap, ncIdOFPortReceiveDropMap, ncIdOFPortReceiveError,
226 ncIdPacketSentMap, ncIdPacketReceiveMap, ncIdBytesSentMap, ncIdBytesReceiveMap);
230 * Input allNodesStats contains statistics of all nodeConnectors of all
231 * nodes. Key is the node and values contains another map with key as
232 * node connector and value as statresult. Output will be a map with key
233 * as nodeconnector and value as the statresult. The key contains
234 * nodeconnectors of all the nodes.
238 private Map<String, String> combineAllNodesStats(Map<String, Map<String, String>> allNodesStats) {
239 Map<String, String> allNcsStatsMap = new HashMap<>();
240 for (Map.Entry<String, Map<String, String>> entry : allNodesStats.entrySet()) {
241 Map<String, String> ncStatsMap = entry.getValue();
242 for (Map.Entry<String, String> statResult : ncStatsMap.entrySet()) {
243 allNcsStatsMap.put(statResult.getKey(), statResult.getValue());
246 return allNcsStatsMap;
250 * FlowRpcStatisticsListener listens for the FlowTableStatisticsUpdate and
251 * then update the corresponding counter map
253 class FlowRpcStatisticsListener implements OpendaylightFlowTableStatisticsListener {
256 public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate flowTableStats) {
257 String node = flowTableStats.getId().getValue().split(":")[1];
258 Map<String, String> entriesPerOFTableMap = new HashMap<>();
259 List<FlowTableAndStatisticsMap> flowTableAndStatisticsMapList = flowTableStats
260 .getFlowTableAndStatisticsMap();
261 for (FlowTableAndStatisticsMap flowTableAndStatisticsMap : flowTableAndStatisticsMapList) {
262 String nodeTableStr = "dpnId_" + node + "_table_"
263 + flowTableAndStatisticsMap.getTableId().getValue().toString();
264 entriesPerOFTableMap.put("EntriesPerOFTable:" + nodeTableStr + "_EntriesPerOFTable",
265 flowTableAndStatisticsMap.getActiveFlows().getValue().toString());
267 nodeAndEntriesPerOFTableMap.put(node, entriesPerOFTableMap);
268 entriesPerOFTableMap = combineAllNodesStats(nodeAndEntriesPerOFTableMap);
269 PMAGENT.connectToPMAgentAndInvokeEntriesPerOFTable(entriesPerOFTableMap);
274 protected void remove(InstanceIdentifier<Node> identifier, Node node) {
275 NodeId nodeId = node.getId();
276 String nodeVal = nodeId.getValue().split(":")[1];
277 BigInteger dpId = new BigInteger(nodeVal);
278 if (nodes.contains(dpId)) {
280 nodeAndNcIdOFPortDurationMap.remove(nodeVal);
281 nodeAndNcIdOFPortReceiveDropMap.remove(nodeVal);
282 nodeAndNcIdOFPortReceiveError.remove(nodeVal);
283 nodeAndNcIdPacketSentMap.remove(nodeVal);
284 nodeAndNcIdPacketReceiveMap.remove(nodeVal);
285 nodeAndNcIdBytesSentMap.remove(nodeVal);
286 nodeAndNcIdBytesReceiveMap.remove(nodeVal);
287 nodeAndEntriesPerOFTableMap.remove(nodeVal);
289 if (nodes.isEmpty()) {
290 stopPortStatRequestTask();
295 protected void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
296 // TODO Auto-generated method stub
300 protected void add(InstanceIdentifier<Node> identifier, Node node) {
301 NodeId nodeId = node.getId();
302 BigInteger dpId = new BigInteger(nodeId.getValue().split(":")[1]);
303 if (nodes.contains(dpId)) {
307 if (nodes.size() == 1) {
308 schedulePortStatRequestTask();