Merge "Spec to support load balancing and high availability of multiple VxLAN tunnels"
[genius.git] / interfacemanager / interfacemanager-impl / src / main / java / org / opendaylight / genius / interfacemanager / pmcounters / NodeConnectorStatsImpl.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.interfacemanager.pmcounters;
9
10 import com.google.common.util.concurrent.ThreadFactoryBuilder;
11 import java.math.BigInteger;
12 import java.util.ArrayList;
13 import java.util.HashMap;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.TimeUnit;
22 import javax.inject.Inject;
23 import javax.inject.Singleton;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.genius.mdsalutil.AbstractDataChangeListener;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 @Singleton
52 public class NodeConnectorStatsImpl extends AbstractDataChangeListener<Node>{
53
54     private static final Logger logger = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
55     private static final String STATS_POLL_FLAG = "interfacemgr.pmcounters.poll";
56     private static final int THREAD_POOL_SIZE = 4;
57     private static final int NO_DELAY = 0;
58     public static final PMAgentForNodeConnectorCounters pmagent = new PMAgentForNodeConnectorCounters();
59     private PortRpcStatisticsListener portStatsListener = new PortRpcStatisticsListener();
60     private FlowRpcStatisticsListener flowTableStatsListener = new FlowRpcStatisticsListener();
61     private List<BigInteger> nodes = new ArrayList<>();
62     Map<String, Map<String, String>> nodeAndNcIdOFPortDurationMap = new ConcurrentHashMap<>();
63     Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveDropMap = new ConcurrentHashMap<>();
64     Map<String, Map<String, String>> nodeAndNcIdOFPortReceiveError = new ConcurrentHashMap<>();
65     Map<String, Map<String, String>> nodeAndNcIdPacketSentMap = new ConcurrentHashMap<>();
66     Map<String, Map<String, String>> nodeAndNcIdPacketReceiveMap = new ConcurrentHashMap<>();
67     Map<String, Map<String, String>> nodeAndNcIdBytesSentMap = new ConcurrentHashMap<>();
68     Map<String, Map<String, String>> nodeAndNcIdBytesReceiveMap = new ConcurrentHashMap<>();
69     Map<String, Map<String, String>> nodeAndEntriesPerOFTableMap = new ConcurrentHashMap<>();
70     private ScheduledFuture<?> scheduledResult;
71     private final OpendaylightPortStatisticsService statPortService;
72     private ScheduledExecutorService portStatExecutorService;
73     private final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService;
74
75     @Inject
76     public NodeConnectorStatsImpl(DataBroker dataBroker, NotificationService notificationService,
77                                   final OpendaylightPortStatisticsService opendaylightPortStatisticsService,
78                                   final OpendaylightFlowTableStatisticsService opendaylightFlowTableStatisticsService) {
79         super(Node.class);
80         this.statPortService = opendaylightPortStatisticsService;
81         this.opendaylightFlowTableStatisticsService = opendaylightFlowTableStatisticsService;
82         registerListener(dataBroker);
83         portStatExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE, getThreadFactory("Port Stats Request Task"));
84         notificationService.registerNotificationListener(portStatsListener);
85         notificationService.registerNotificationListener(flowTableStatsListener);
86         pmagent.registerMbean();
87     }
88
89     private void registerListener(final DataBroker db) {
90         try {
91             db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
92                     getWildCardPath(), NodeConnectorStatsImpl.this, AsyncDataBroker.DataChangeScope.SUBTREE);
93         } catch (final Exception e) {
94             logger.error("NodeConnectorStatsImpl: DataChange listener registration fail!", e);
95             throw new IllegalStateException("NodeConnectorStatsImpl: registration Listener failed.", e);
96         }
97     }
98
99     private InstanceIdentifier<Node> getWildCardPath() {
100         return InstanceIdentifier.create(Nodes.class).child(Node.class);
101     }
102
103     /*
104      * PortStat request task is started when first DPN gets connected
105      */
106     private void schedulePortStatRequestTask() {
107         if (!Boolean.getBoolean(STATS_POLL_FLAG)) {
108             logger.info("Port statistics is turned off");
109             return;
110         }
111         logger.info("Scheduling port statistics request");
112         PortStatRequestTask portStatRequestTask = new PortStatRequestTask();
113         scheduledResult = portStatExecutorService.scheduleAtFixedRate(portStatRequestTask, NO_DELAY, 10000, TimeUnit.MILLISECONDS);
114     }
115
116     /*
117      * PortStat request task is stopped when last DPN is removed.
118      */
119     private void stopPortStatRequestTask() {
120         if(scheduledResult != null) {
121             logger.info("Stopping port statistics request");
122             scheduledResult.cancel(true);
123         }
124     }
125
126     /*
127      * This task queries for node connector statistics as well as flowtables statistics every 10 secs.
128      * Minimum period which can be configured for PMJob is 10 secs.
129      */
130     private class PortStatRequestTask implements Runnable {
131
132         @Override
133         public void run() {
134             if(logger.isTraceEnabled()) {
135                 logger.trace("Requesting port stats - {}");
136             }
137             for (BigInteger node : nodes) {
138                 logger.trace("Requesting AllNodeConnectorStatistics for node - {}", node);
139                 statPortService.getAllNodeConnectorsStatistics(buildGetAllNodeConnectorStatistics(node));
140                 opendaylightFlowTableStatisticsService.getFlowTablesStatistics(buildGetFlowTablesStatistics(node));
141             }
142         }
143
144         private GetAllNodeConnectorsStatisticsInput buildGetAllNodeConnectorStatistics(BigInteger dpId) {
145             return new GetAllNodeConnectorsStatisticsInputBuilder()
146                     .setNode(
147                             new NodeRef(InstanceIdentifier.builder(Nodes.class)
148                                     .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build())).build();
149         }
150
151         private GetFlowTablesStatisticsInput buildGetFlowTablesStatistics(BigInteger dpId) {
152             return new GetFlowTablesStatisticsInputBuilder()
153                     .setNode(
154                             new NodeRef(InstanceIdentifier.builder(Nodes.class)
155                                     .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build())).build();
156         }
157
158     }
159
160     private ThreadFactory getThreadFactory(String threadNameFormat) {
161         ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
162         builder.setNameFormat(threadNameFormat);
163         builder.setUncaughtExceptionHandler(
164                 (t, e) -> logger.error("Received Uncaught Exception event in Thread: {}", t.getName(), e));
165         return builder.build();
166     }
167
168     /*
169      * PortRpcStatisticsListener listens for the NodeConnectorStatisticsUpdate and then update the corresponding counter map
170      */
171     class PortRpcStatisticsListener implements OpendaylightPortStatisticsListener {
172
173         @Override
174         public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate ncStats) {
175             Map<String, String> ncIdOFPortDurationMap = new HashMap<>();
176             Map<String, String> ncIdOFPortReceiveDropMap = new HashMap<>();
177             Map<String, String> ncIdOFPortReceiveError = new HashMap<>();
178             Map<String, String> ncIdPacketSentMap = new HashMap<>();
179             Map<String, String> ncIdPacketReceiveMap = new HashMap<>();
180             Map<String, String> ncIdBytesSentMap = new HashMap<>();
181             Map<String, String> ncIdBytesReceiveMap = new HashMap<>();
182             List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = ncStats.getNodeConnectorStatisticsAndPortNumberMap();
183             NodeId nodeId = ncStats.getId();
184             String node = nodeId.getValue().split(":")[1];
185             for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
186                 NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
187                 String port = nodeConnector.getValue().split(":")[2];
188                 String nodePortStr = "dpnId_" + node + "_portNum_" + port;
189                 ncIdOFPortDurationMap.put("OFPortDuration:" + nodePortStr + "_OFPortDuration", ncStatsAndPortMap.getDuration().getSecond().getValue().toString());
190                 ncIdOFPortReceiveDropMap.put("PacketsPerOFPortReceiveDrop:" + nodePortStr + "_PacketsPerOFPortReceiveDrop", ncStatsAndPortMap.getReceiveDrops().toString());
191                 ncIdOFPortReceiveError.put("PacketsPerOFPortReceiveError:" + nodePortStr + "_PacketsPerOFPortReceiveError", ncStatsAndPortMap.getReceiveErrors().toString());
192                 ncIdPacketSentMap.put("PacketsPerOFPortSent:" + nodePortStr + "_PacketsPerOFPortSent", ncStatsAndPortMap.getPackets().getTransmitted().toString());
193                 ncIdPacketReceiveMap.put("PacketsPerOFPortReceive:" + nodePortStr + "_PacketsPerOFPortReceive", ncStatsAndPortMap.getPackets().getReceived().toString());
194                 ncIdBytesSentMap.put("BytesPerOFPortSent:" + nodePortStr + "_BytesPerOFPortSent", ncStatsAndPortMap.getBytes().getTransmitted().toString());
195                 ncIdBytesReceiveMap.put("BytesPerOFPortReceive:" + nodePortStr + "_BytesPerOFPortReceive", ncStatsAndPortMap.getBytes().getReceived().toString());
196             }
197             logger.trace("Port Stats {}", ncStatsAndPortMapList);
198             //Storing allNodeConnectorStats(like ncIdOFPortDurationMap) in a map with key as node for easy removal and addition of allNodeConnectorStats.
199             nodeAndNcIdOFPortDurationMap.put(node, ncIdOFPortDurationMap);
200             nodeAndNcIdOFPortReceiveDropMap.put(node, ncIdOFPortReceiveDropMap);
201             nodeAndNcIdOFPortReceiveError.put(node, ncIdOFPortReceiveError);
202             nodeAndNcIdPacketSentMap.put(node, ncIdPacketSentMap);
203             nodeAndNcIdPacketReceiveMap.put(node, ncIdPacketReceiveMap);
204             nodeAndNcIdBytesSentMap.put(node, ncIdBytesSentMap);
205             nodeAndNcIdBytesReceiveMap.put(node, ncIdBytesReceiveMap);
206             //Combining the stats of all nodeconnectors in all nodes. This Map will be stored under MBean which will be queried as regular intervals.
207             ncIdOFPortDurationMap = combineAllNodesStats(nodeAndNcIdOFPortDurationMap);
208             ncIdOFPortReceiveDropMap = combineAllNodesStats(nodeAndNcIdOFPortReceiveDropMap);
209             ncIdOFPortReceiveError = combineAllNodesStats(nodeAndNcIdOFPortReceiveError);
210             ncIdPacketSentMap = combineAllNodesStats(nodeAndNcIdPacketSentMap);
211             ncIdPacketReceiveMap = combineAllNodesStats(nodeAndNcIdPacketReceiveMap);
212             ncIdBytesSentMap = combineAllNodesStats(nodeAndNcIdBytesSentMap);
213             ncIdBytesReceiveMap = combineAllNodesStats(nodeAndNcIdBytesReceiveMap);
214             pmagent.connectToPMAgent(ncIdOFPortDurationMap, ncIdOFPortReceiveDropMap, ncIdOFPortReceiveError, ncIdPacketSentMap, ncIdPacketReceiveMap, ncIdBytesSentMap, ncIdBytesReceiveMap);
215         }
216
217         /*
218          * Input allNodesStats contains statistics of all nodeConnectors of all nodes. Key is the node and values contains another map with key as node connector and value as statresult.
219          * Output will be a map with key as nodeconnector and value as the statresult. The key contains nodeconnectors of all the nodes.
220          */
221     }
222
223     private Map<String, String> combineAllNodesStats(Map<String, Map<String, String>> allNodesStats) {
224         Map<String, String> allNcsStatsMap = new HashMap<>();
225         for (Map.Entry<String, Map<String, String>> entry : allNodesStats.entrySet()) {
226             Map<String, String> ncStatsMap = entry.getValue();
227             for (Map.Entry<String, String> statResult : ncStatsMap.entrySet()) {
228                 allNcsStatsMap.put(statResult.getKey(), statResult.getValue());
229             }
230         }
231         return allNcsStatsMap;
232     }
233
234     /*
235      * FlowRpcStatisticsListener listens for the FlowTableStatisticsUpdate and then update the corresponding counter map
236      */
237     class FlowRpcStatisticsListener implements OpendaylightFlowTableStatisticsListener {
238
239         @Override
240         public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate flowTableStats) {
241             String node = flowTableStats.getId().getValue().split(":")[1];
242             Map<String, String> entriesPerOFTableMap = new HashMap<>();
243             List<FlowTableAndStatisticsMap> flowTableAndStatisticsMapList = flowTableStats.getFlowTableAndStatisticsMap();
244             for (FlowTableAndStatisticsMap flowTableAndStatisticsMap : flowTableAndStatisticsMapList) {
245                 String nodeTableStr =  "dpnId_" + node + "_table_" + flowTableAndStatisticsMap.getTableId().getValue().toString();
246                 entriesPerOFTableMap.put("EntriesPerOFTable:" + nodeTableStr + "_EntriesPerOFTable", flowTableAndStatisticsMap.getActiveFlows().getValue().toString());
247             }
248             nodeAndEntriesPerOFTableMap.put(node, entriesPerOFTableMap);
249             entriesPerOFTableMap = combineAllNodesStats(nodeAndEntriesPerOFTableMap);
250             pmagent.connectToPMAgentAndInvokeEntriesPerOFTable(entriesPerOFTableMap);
251         }
252
253     }
254
255     @Override
256     protected void remove(InstanceIdentifier<Node> identifier, Node node) {
257         NodeId nodeId = node.getId();
258         String nodeVal = nodeId.getValue().split(":")[1];
259         BigInteger dpId = new BigInteger(nodeVal);
260         if (nodes.contains(dpId)) {
261             nodes.remove(dpId);
262             nodeAndNcIdOFPortDurationMap.remove(nodeVal);
263             nodeAndNcIdOFPortReceiveDropMap.remove(nodeVal);
264             nodeAndNcIdOFPortReceiveError.remove(nodeVal);
265             nodeAndNcIdPacketSentMap.remove(nodeVal);
266             nodeAndNcIdPacketReceiveMap.remove(nodeVal);
267             nodeAndNcIdBytesSentMap.remove(nodeVal);
268             nodeAndNcIdBytesReceiveMap.remove(nodeVal);
269             nodeAndEntriesPerOFTableMap.remove(nodeVal);
270         }
271         if (nodes.isEmpty()) {
272             stopPortStatRequestTask();
273         }
274     }
275
276     @Override
277     protected void update(InstanceIdentifier<Node> identifier, Node original,
278                           Node update) {
279         // TODO Auto-generated method stub
280     }
281
282     @Override
283     protected void add(InstanceIdentifier<Node> identifier, Node node) {
284         NodeId nodeId = node.getId();
285         BigInteger dpId = new BigInteger(nodeId.getValue().split(":")[1]);
286         if (nodes.contains(dpId)) {
287             return;
288         }
289         nodes.add(dpId);
290         if (nodes.size() == 1) {
291             schedulePortStatRequestTask();
292         }
293     }
294 }