Merge "Bug 8048 potential fix for ID duplication issue"
[genius.git] / interfacemanager / interfacemanager-impl / src / main / java / org / opendaylight / genius / interfacemanager / pmcounters / NodeConnectorStatsImpl.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.interfacemanager.pmcounters;
9
10 import com.google.common.util.concurrent.ThreadFactoryBuilder;
11 import java.math.BigInteger;
12 import java.util.ArrayList;
13 import java.util.HashMap;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.TimeUnit;
22 import javax.inject.Inject;
23 import javax.inject.Singleton;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 @Singleton
51 public class NodeConnectorStatsImpl extends AsyncDataTreeChangeListenerBase<Node, NodeConnectorStatsImpl> {
52     private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
53     private static final String STATS_POLL_FLAG = "interfacemgr.pmcounters.poll";
54     private static final int THREAD_POOL_SIZE = 4;
55     private static final int NO_DELAY = 0;
56     public static final PMAgentForNodeConnectorCounters PMAGENT = new PMAgentForNodeConnectorCounters();
57     private final PortRpcStatisticsListener portStatsListener = new PortRpcStatisticsListener();
58     private final FlowRpcStatisticsListener flowTableStatsListener = new FlowRpcStatisticsListener();
59     private final List<BigInteger> nodes = new ArrayList<>();
60     Map<String, Map<String, String>> nodeAndNcIdOFPortDurationMap = new ConcurrentHashMap<>();
61     Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveDropMap = new ConcurrentHashMap<>();
62     Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveError = new ConcurrentHashMap<>();
63     Map<String, Map<String, String>> nodeAndNcIdPacketSentMap = new ConcurrentHashMap<>();
64     Map<String, Map<String, String>> nodeAndNcIdPacketReceiveMap = new ConcurrentHashMap<>();
65     Map<String, Map<String, String>> nodeAndNcIdBytesSentMap = new ConcurrentHashMap<>();
66     Map<String, Map<String, String>> nodeAndNcIdBytesReceiveMap = new ConcurrentHashMap<>();
67     Map<String, Map<String, String>> nodeAndEntriesPerOFTableMap = new ConcurrentHashMap<>();
68     private ScheduledFuture<?> scheduledResult;
69     private final OpendaylightPortStatisticsService statPortService;
70     private final ScheduledExecutorService portStatExecutorService;
71     private final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService;
72
73     @Inject
74     public NodeConnectorStatsImpl(DataBroker dataBroker, NotificationService notificationService,
75                                   final OpendaylightPortStatisticsService opendaylightPortStatisticsService,
76                                   final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService) {
77         super(Node.class, NodeConnectorStatsImpl.class);
78         this.statPortService = opendaylightPortStatisticsService;
79         this.opendaylightFlowTableStatisticsService = opendaylightFlowTableStatisticsService;
80         registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
81         portStatExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
82             getThreadFactory("Port Stats " + "Request Task"));
83         notificationService.registerNotificationListener(portStatsListener);
84         notificationService.registerNotificationListener(flowTableStatsListener);
85         PMAGENT.registerMbean();
86     }
87
88     @Override
89     public InstanceIdentifier<Node> getWildCardPath() {
90         return InstanceIdentifier.create(Nodes.class).child(Node.class);
91     }
92
93     @Override
94     protected NodeConnectorStatsImpl getDataTreeChangeListener() {
95         return NodeConnectorStatsImpl.this;
96     }
97
98     /*
99      * PortStat request task is started when first DPN gets connected
100      */
101     private void schedulePortStatRequestTask() {
102         if (!Boolean.getBoolean(STATS_POLL_FLAG)) {
103             LOG.info("Port statistics is turned off");
104             return;
105         }
106         LOG.info("Scheduling port statistics request");
107         PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
108         scheduledResult = portStatExecutorService.scheduleAtFixedRate(portStatRequestTask, NO_DELAY, 10000,
109                 TimeUnit.MILLISECONDS);
110     }
111
112     /*
113      * PortStat request task is stopped when last DPN is removed.
114      */
115     private void stopPortStatRequestTask() {
116         if (scheduledResult != null) {
117             LOG.info("Stopping port statistics request");
118             scheduledResult.cancel(true);
119         }
120     }
121
122     /*
123      * This task queries for node connector statistics as well as flowtables
124      * statistics every 10 secs. Minimum period which can be configured for
125      * PMJob is 10 secs.
126      */
127     private class PortStatRequestTask implements Runnable {
128
129         @Override
130         public void run() {
131             if (LOG.isTraceEnabled()) {
132                 LOG.trace("Requesting port stats - {}");
133             }
134             for (BigInteger node : nodes) {
135                 LOG.trace("Requesting AllNodeConnectorStatistics for node - {}", node);
136                 statPortService.getAllNodeConnectorsStatistics(buildGetAllNodeConnectorStatistics(node));
137                 opendaylightFlowTableStatisticsService.getFlowTablesStatistics(buildGetFlowTablesStatistics(node));
138             }
139         }
140
141         private GetAllNodeConnectorsStatisticsInput buildGetAllNodeConnectorStatistics(BigInteger dpId) {
142             return new GetAllNodeConnectorsStatisticsInputBuilder()
143                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
144                             .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
145                     .build();
146         }
147
148         private GetFlowTablesStatisticsInput buildGetFlowTablesStatistics(BigInteger dpId) {
149             return new GetFlowTablesStatisticsInputBuilder()
150                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
151                             .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
152                     .build();
153         }
154     }
155
156     private ThreadFactory getThreadFactory(String threadNameFormat) {
157         ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
158         builder.setNameFormat(threadNameFormat);
159         builder.setUncaughtExceptionHandler((thread, exception) -> LOG
160                 .error("Received Uncaught Exception event in Thread: {}", thread.getName(), exception));
161         return builder.build();
162     }
163
164     /*
165      * PortRpcStatisticsListener listens for the NodeConnectorStatisticsUpdate
166      * and then update the corresponding counter map
167      */
168     class PortRpcStatisticsListener implements OpendaylightPortStatisticsListener {
169
170         @Override
171         public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate ncStats) {
172             Map<String, String> ncIdOFPortDurationMap = new HashMap<>();
173             Map<String, String> ncIdOFPortReceiveDropMap = new HashMap<>();
174             Map<String, String> ncIdOFPortReceiveError = new HashMap<>();
175             Map<String, String> ncIdPacketSentMap = new HashMap<>();
176             Map<String, String> ncIdPacketReceiveMap = new HashMap<>();
177             Map<String, String> ncIdBytesSentMap = new HashMap<>();
178             Map<String, String> ncIdBytesReceiveMap = new HashMap<>();
179             List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = ncStats
180                     .getNodeConnectorStatisticsAndPortNumberMap();
181             NodeId nodeId = ncStats.getId();
182             String node = nodeId.getValue().split(":")[1];
183             for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
184                 NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
185                 String port = nodeConnector.getValue().split(":")[2];
186                 String nodePortStr = "dpnId_" + node + "_portNum_" + port;
187                 ncIdOFPortDurationMap.put("OFPortDuration:" + nodePortStr + "_OFPortDuration",
188                         ncStatsAndPortMap.getDuration().getSecond().getValue().toString());
189                 ncIdOFPortReceiveDropMap.put(
190                         "PacketsPerOFPortReceiveDrop:" + nodePortStr + "_PacketsPerOFPortReceiveDrop",
191                         ncStatsAndPortMap.getReceiveDrops().toString());
192                 ncIdOFPortReceiveError.put(
193                         "PacketsPerOFPortReceiveError:" + nodePortStr + "_PacketsPerOFPortReceiveError",
194                         ncStatsAndPortMap.getReceiveErrors().toString());
195                 ncIdPacketSentMap.put("PacketsPerOFPortSent:" + nodePortStr + "_PacketsPerOFPortSent",
196                         ncStatsAndPortMap.getPackets().getTransmitted().toString());
197                 ncIdPacketReceiveMap.put("PacketsPerOFPortReceive:" + nodePortStr + "_PacketsPerOFPortReceive",
198                         ncStatsAndPortMap.getPackets().getReceived().toString());
199                 ncIdBytesSentMap.put("BytesPerOFPortSent:" + nodePortStr + "_BytesPerOFPortSent",
200                         ncStatsAndPortMap.getBytes().getTransmitted().toString());
201                 ncIdBytesReceiveMap.put("BytesPerOFPortReceive:" + nodePortStr + "_BytesPerOFPortReceive",
202                         ncStatsAndPortMap.getBytes().getReceived().toString());
203             }
204             LOG.trace("Port Stats {}", ncStatsAndPortMapList);
205             // Storing allNodeConnectorStats(like ncIdOFPortDurationMap) in a
206             // map with key as node for easy removal and addition of
207             // allNodeConnectorStats.
208             nodeAndNcIdOFPortDurationMap.put(node, ncIdOFPortDurationMap);
209             nodeAndNcIdOFPortReceiveDropMap.put(node, ncIdOFPortReceiveDropMap);
210             nodeAndNcIdOFPortReceiveError.put(node, ncIdOFPortReceiveError);
211             nodeAndNcIdPacketSentMap.put(node, ncIdPacketSentMap);
212             nodeAndNcIdPacketReceiveMap.put(node, ncIdPacketReceiveMap);
213             nodeAndNcIdBytesSentMap.put(node, ncIdBytesSentMap);
214             nodeAndNcIdBytesReceiveMap.put(node, ncIdBytesReceiveMap);
215             // Combining the stats of all nodeconnectors in all nodes. This Map
216             // will be stored under MBean which will be queried as regular
217             // intervals.
218             ncIdOFPortDurationMap = combineAllNodesStats(nodeAndNcIdOFPortDurationMap);
219             ncIdOFPortReceiveDropMap = combineAllNodesStats(nodeAndNcIdOFPortReceiveDropMap);
220             ncIdOFPortReceiveError = combineAllNodesStats(nodeAndNcIdOFPortReceiveError);
221             ncIdPacketSentMap = combineAllNodesStats(nodeAndNcIdPacketSentMap);
222             ncIdPacketReceiveMap = combineAllNodesStats(nodeAndNcIdPacketReceiveMap);
223             ncIdBytesSentMap = combineAllNodesStats(nodeAndNcIdBytesSentMap);
224             ncIdBytesReceiveMap = combineAllNodesStats(nodeAndNcIdBytesReceiveMap);
225             PMAGENT.connectToPMAgent(ncIdOFPortDurationMap, ncIdOFPortReceiveDropMap, ncIdOFPortReceiveError,
226                     ncIdPacketSentMap, ncIdPacketReceiveMap, ncIdBytesSentMap, ncIdBytesReceiveMap);
227         }
228
229         /*
230          * Input allNodesStats contains statistics of all nodeConnectors of all
231          * nodes. Key is the node and values contains another map with key as
232          * node connector and value as statresult. Output will be a map with key
233          * as nodeconnector and value as the statresult. The key contains
234          * nodeconnectors of all the nodes.
235          */
236     }
237
238     private Map<String, String> combineAllNodesStats(Map<String, Map<String, String>> allNodesStats) {
239         Map<String, String> allNcsStatsMap = new HashMap<>();
240         for (Map.Entry<String, Map<String, String>> entry : allNodesStats.entrySet()) {
241             Map<String, String> ncStatsMap = entry.getValue();
242             for (Map.Entry<String, String> statResult : ncStatsMap.entrySet()) {
243                 allNcsStatsMap.put(statResult.getKey(), statResult.getValue());
244             }
245         }
246         return allNcsStatsMap;
247     }
248
249     /*
250      * FlowRpcStatisticsListener listens for the FlowTableStatisticsUpdate and
251      * then update the corresponding counter map
252      */
253     class FlowRpcStatisticsListener implements OpendaylightFlowTableStatisticsListener {
254
255         @Override
256         public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate flowTableStats) {
257             String node = flowTableStats.getId().getValue().split(":")[1];
258             Map<String, String> entriesPerOFTableMap = new HashMap<>();
259             List<FlowTableAndStatisticsMap> flowTableAndStatisticsMapList = flowTableStats
260                     .getFlowTableAndStatisticsMap();
261             for (FlowTableAndStatisticsMap flowTableAndStatisticsMap : flowTableAndStatisticsMapList) {
262                 String nodeTableStr = "dpnId_" + node + "_table_"
263                         + flowTableAndStatisticsMap.getTableId().getValue().toString();
264                 entriesPerOFTableMap.put("EntriesPerOFTable:" + nodeTableStr + "_EntriesPerOFTable",
265                         flowTableAndStatisticsMap.getActiveFlows().getValue().toString());
266             }
267             nodeAndEntriesPerOFTableMap.put(node, entriesPerOFTableMap);
268             entriesPerOFTableMap = combineAllNodesStats(nodeAndEntriesPerOFTableMap);
269             PMAGENT.connectToPMAgentAndInvokeEntriesPerOFTable(entriesPerOFTableMap);
270         }
271     }
272
273     @Override
274     protected void remove(InstanceIdentifier<Node> identifier, Node node) {
275         NodeId nodeId = node.getId();
276         String nodeVal = nodeId.getValue().split(":")[1];
277         BigInteger dpId = new BigInteger(nodeVal);
278         if (nodes.contains(dpId)) {
279             nodes.remove(dpId);
280             nodeAndNcIdOFPortDurationMap.remove(nodeVal);
281             nodeAndNcIdOFPortReceiveDropMap.remove(nodeVal);
282             nodeAndNcIdOFPortReceiveError.remove(nodeVal);
283             nodeAndNcIdPacketSentMap.remove(nodeVal);
284             nodeAndNcIdPacketReceiveMap.remove(nodeVal);
285             nodeAndNcIdBytesSentMap.remove(nodeVal);
286             nodeAndNcIdBytesReceiveMap.remove(nodeVal);
287             nodeAndEntriesPerOFTableMap.remove(nodeVal);
288         }
289         if (nodes.isEmpty()) {
290             stopPortStatRequestTask();
291         }
292     }
293
294     @Override
295     protected void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
296         // TODO Auto-generated method stub
297     }
298
299     @Override
300     protected void add(InstanceIdentifier<Node> identifier, Node node) {
301         NodeId nodeId = node.getId();
302         BigInteger dpId = new BigInteger(nodeId.getValue().split(":")[1]);
303         if (nodes.contains(dpId)) {
304             return;
305         }
306         nodes.add(dpId);
307         if (nodes.size() == 1) {
308             schedulePortStatRequestTask();
309         }
310     }
311 }