MD-SAL Statistics Manager - Adding support for individual flow stats
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Future;
16
17 import org.eclipse.xtext.xbase.lib.Exceptions;
18 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
19 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
20 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.TableId;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
44 import org.opendaylight.yangtools.concepts.Registration;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.binding.NotificationListener;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public class StatisticsProvider implements AutoCloseable {
52
53     public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
54     
55     private DataProviderService dps;
56
57     private NotificationProviderService nps;
58     
59     private OpendaylightGroupStatisticsService groupStatsService;
60     
61     private OpendaylightMeterStatisticsService meterStatsService;
62     
63     private OpendaylightFlowStatisticsService flowStatsService;
64     
65     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
66     
67     private Thread statisticsRequesterThread;
68     
69     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
70     
71     private final int STATS_THREAD_EXECUTION_TIME= 50000;
72     //Local caching of stats
73     
74     private final ConcurrentMap<NodeId,NodeStatistics> statisticsCache = 
75             new ConcurrentHashMap<NodeId,NodeStatistics>();
76     
77     public DataProviderService getDataService() {
78       return this.dps;
79     }
80     
81     public void setDataService(final DataProviderService dataService) {
82       this.dps = dataService;
83     }
84     
85     public NotificationProviderService getNotificationService() {
86       return this.nps;
87     }
88     
89     public void setNotificationService(final NotificationProviderService notificationService) {
90       this.nps = notificationService;
91     }
92
93     public MultipartMessageManager getMultipartMessageManager() {
94         return multipartMessageManager;
95     }
96
97     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
98     
99     private Registration<NotificationListener> listenerRegistration;
100     
101     public void start() {
102         
103         NotificationProviderService nps = this.getNotificationService();
104         Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
105         this.listenerRegistration = registerNotificationListener;
106         
107         // Get Group/Meter statistics service instance
108         groupStatsService = StatisticsManagerActivator.getProviderContext().
109                 getRpcService(OpendaylightGroupStatisticsService.class);
110         
111         meterStatsService = StatisticsManagerActivator.getProviderContext().
112                 getRpcService(OpendaylightMeterStatisticsService.class);
113         
114         flowStatsService = StatisticsManagerActivator.getProviderContext().
115                 getRpcService(OpendaylightFlowStatisticsService.class);
116
117         statisticsRequesterThread = new Thread( new Runnable(){
118
119             @Override
120             public void run() {
121                 while(true){
122                     try {
123                         statsRequestSender();
124                         
125                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
126                     }catch (Exception e){
127                         spLogger.error("Exception occurred while sending stats request : {}",e);
128                     }
129                 }
130             }
131         });
132         
133         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
134         
135         statisticsRequesterThread.start();
136         
137         spLogger.info("Statistics Provider started.");
138     }
139     
140     protected DataModificationTransaction startChange() {
141         
142         DataProviderService dps = this.getDataService();
143         return dps.beginTransaction();
144     }
145     
146     private void statsRequestSender(){
147         
148         //Need to call API to receive all the nodes connected to controller.
149         List<Node> targetNodes = getAllConnectedNodes();
150         
151         if(targetNodes == null)
152             return;
153
154         for (Node targetNode : targetNodes){
155             
156             InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
157             NodeRef targetNodeRef = new NodeRef(targetInstanceId);
158             
159             try {
160                 
161                 sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
162
163                 sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
164
165             }catch(Exception e){
166                 spLogger.error("Exception occured while sending flow statistics request : {}",e);
167             }
168
169             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
170
171                 spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
172                 
173                 try{
174                   sendAllGroupStatisticsRequest(targetNodeRef);
175                   Thread.sleep(1000);
176                   sendAllMeterStatisticsRequest(targetNodeRef);
177                   Thread.sleep(1000);
178                   sendGroupDescriptionRequest(targetNodeRef);
179                   Thread.sleep(1000);
180                   sendMeterConfigStatisticsRequest(targetNodeRef);
181                   Thread.sleep(1000);
182                 }catch(Exception e){
183                     spLogger.error("Exception occured while sending statistics request : {}", e);
184                 }
185             }
186         }
187     }
188     
189     private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode){
190         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
191                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
192         
193         input.setNode(targetNode);
194         
195         @SuppressWarnings("unused")
196         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = 
197                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
198         
199     }
200     
201     private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
202         
203         List<Short> tablesId = getTablesFromNode(targetNodeKey);
204         
205         if(tablesId.size() != 0){
206             for(Short id : tablesId){
207                 
208                 spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
209                 GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
210                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
211                 
212                 input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
213                 input.setTableId(new TableId(id));
214                 Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
215                         flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
216                 
217                 multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
218             }
219         }
220         
221         //Note: Just for testing, because i am not able to fetch table list from datastore
222         // Bug-225 is raised for investigation.
223         
224 //                spLogger.info("Send aggregate stats request for flow table {} to node {}",1,targetNodeKey);
225 //                GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
226 //                        new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
227 //                
228 //                input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
229 //                input.setTableId(new TableId((short)1));
230 //                Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
231 //                        flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());`
232 //                
233 //                multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), (short)1);
234     }
235
236     private void sendAllGroupStatisticsRequest(NodeRef targetNode){
237         
238         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
239         
240         input.setNode(targetNode);
241
242         @SuppressWarnings("unused")
243         Future<RpcResult<GetAllGroupStatisticsOutput>> response = 
244                 groupStatsService.getAllGroupStatistics(input.build());
245     }
246     
247     private void sendGroupDescriptionRequest(NodeRef targetNode){
248         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
249         
250         input.setNode(targetNode);
251
252         @SuppressWarnings("unused")
253         Future<RpcResult<GetGroupDescriptionOutput>> response = 
254                 groupStatsService.getGroupDescription(input.build());
255     }
256     
257     private void sendAllMeterStatisticsRequest(NodeRef targetNode){
258         
259         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
260         
261         input.setNode(targetNode);
262
263         @SuppressWarnings("unused")
264         Future<RpcResult<GetAllMeterStatisticsOutput>> response = 
265                 meterStatsService.getAllMeterStatistics(input.build());
266     }
267     
268     private void sendMeterConfigStatisticsRequest(NodeRef targetNode){
269         
270         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
271         
272         input.setNode(targetNode);
273
274         @SuppressWarnings("unused")
275         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response = 
276                 meterStatsService.getAllMeterConfigStatistics(input.build());
277         
278     }
279     
280     public ConcurrentMap<NodeId, NodeStatistics> getStatisticsCache() {
281         return statisticsCache;
282     }
283     
284     private List<Node> getAllConnectedNodes(){
285         
286         Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
287         if(nodes == null)
288             return null;
289         
290         spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
291         return nodes.getNode();
292     }
293     
294     private List<Short> getTablesFromNode(NodeKey nodeKey){
295         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
296         
297         FlowCapableNode node = (FlowCapableNode)dps.readConfigurationData(nodesIdentifier);
298         List<Short> tablesId = new ArrayList<Short>();
299         if(node != null && node.getTable()!=null){
300             spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
301             for(Table table: node.getTable()){
302                 tablesId.add(table.getId());
303             }
304         }
305         return tablesId;
306     }
307
308     @SuppressWarnings("deprecation")
309     @Override
310     public void close(){
311         
312         try {
313             spLogger.info("Statistics Provider stopped.");
314             if (this.listenerRegistration != null) {
315               
316                 this.listenerRegistration.close();
317                 
318                 this.statisticsRequesterThread.destroy();
319             
320             }
321           } catch (Throwable e) {
322             throw Exceptions.sneakyThrow(e);
323           }
324     }
325
326 }