Merge "Fixed netconf monitoring."
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Future;
16
17 import org.eclipse.xtext.xbase.lib.Exceptions;
18 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
19 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
20 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllPortsStatisticsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllPortsStatisticsOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
49 import org.opendaylight.yangtools.concepts.Registration;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.opendaylight.yangtools.yang.binding.NotificationListener;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 public class StatisticsProvider implements AutoCloseable {
57
58     public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
59     
60     private DataProviderService dps;
61
62     private NotificationProviderService nps;
63     
64     private OpendaylightGroupStatisticsService groupStatsService;
65     
66     private OpendaylightMeterStatisticsService meterStatsService;
67     
68     private OpendaylightFlowStatisticsService flowStatsService;
69     
70     private OpendaylightPortStatisticsService portStatsService;
71
72     private OpendaylightFlowTableStatisticsService flowTableStatsService;
73
74     private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
75     
76     private Thread statisticsRequesterThread;
77     
78     private final  InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
79     
80     private final int STATS_THREAD_EXECUTION_TIME= 50000;
81     //Local caching of stats
82     
83     private final ConcurrentMap<NodeId,NodeStatistics> statisticsCache = 
84             new ConcurrentHashMap<NodeId,NodeStatistics>();
85     
86     public DataProviderService getDataService() {
87       return this.dps;
88     }
89     
90     public void setDataService(final DataProviderService dataService) {
91       this.dps = dataService;
92     }
93     
94     public NotificationProviderService getNotificationService() {
95       return this.nps;
96     }
97     
98     public void setNotificationService(final NotificationProviderService notificationService) {
99       this.nps = notificationService;
100     }
101
102     public MultipartMessageManager getMultipartMessageManager() {
103         return multipartMessageManager;
104     }
105
106     private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
107     
108     private Registration<NotificationListener> listenerRegistration;
109     
110     public void start() {
111         
112         NotificationProviderService nps = this.getNotificationService();
113         Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
114         this.listenerRegistration = registerNotificationListener;
115         
116         // Get Group/Meter statistics service instance
117         groupStatsService = StatisticsManagerActivator.getProviderContext().
118                 getRpcService(OpendaylightGroupStatisticsService.class);
119         
120         meterStatsService = StatisticsManagerActivator.getProviderContext().
121                 getRpcService(OpendaylightMeterStatisticsService.class);
122         
123         flowStatsService = StatisticsManagerActivator.getProviderContext().
124                 getRpcService(OpendaylightFlowStatisticsService.class);
125
126         portStatsService = StatisticsManagerActivator.getProviderContext().
127                 getRpcService(OpendaylightPortStatisticsService.class);
128
129         flowTableStatsService = StatisticsManagerActivator.getProviderContext().
130                 getRpcService(OpendaylightFlowTableStatisticsService.class);
131         
132         statisticsRequesterThread = new Thread( new Runnable(){
133
134             @Override
135             public void run() {
136                 while(true){
137                     try {
138                         statsRequestSender();
139                         
140                         Thread.sleep(STATS_THREAD_EXECUTION_TIME);
141                     }catch (Exception e){
142                         spLogger.error("Exception occurred while sending stats request : {}",e);
143                     }
144                 }
145             }
146         });
147         
148         spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
149         
150         statisticsRequesterThread.start();
151         
152         spLogger.info("Statistics Provider started.");
153     }
154     
155     protected DataModificationTransaction startChange() {
156         
157         DataProviderService dps = this.getDataService();
158         return dps.beginTransaction();
159     }
160     
161     private void statsRequestSender(){
162         
163         List<Node> targetNodes = getAllConnectedNodes();
164         
165         if(targetNodes == null)
166             return;
167         
168
169         for (Node targetNode : targetNodes){
170             
171             InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
172             NodeRef targetNodeRef = new NodeRef(targetInstanceId);
173             
174             try {
175                 
176                 sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
177
178                 sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
179
180                 sendAllPortStatisticsRequest(targetNodeRef);
181                 
182                 sendAllFlowTablesStatisticsRequest(targetNodeRef);
183
184             }catch(Exception e){
185                 spLogger.error("Exception occured while sending statistics requests : {}",e);
186             }
187
188             if(targetNode.getAugmentation(FlowCapableNode.class) != null){
189
190                 spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
191
192                 try{
193                   sendAllGroupStatisticsRequest(targetNodeRef);
194                   Thread.sleep(1000);
195                   sendAllMeterStatisticsRequest(targetNodeRef);
196                   Thread.sleep(1000);
197                   sendGroupDescriptionRequest(targetNodeRef);
198                   Thread.sleep(1000);
199                   sendMeterConfigStatisticsRequest(targetNodeRef);
200                   Thread.sleep(1000);
201                 }catch(Exception e){
202                     spLogger.error("Exception occured while sending statistics requests : {}", e);
203                 }
204             }
205         }
206     }
207
208     private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) {
209         final GetFlowTablesStatisticsInputBuilder input = 
210                 new GetFlowTablesStatisticsInputBuilder();
211         
212         input.setNode(targetNodeRef);
213
214         @SuppressWarnings("unused")
215         Future<RpcResult<GetFlowTablesStatisticsOutput>> response = 
216                 flowTableStatsService.getFlowTablesStatistics(input.build());
217     }
218
219     private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode){
220         final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
221                 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
222         
223         input.setNode(targetNode);
224         
225         @SuppressWarnings("unused")
226         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = 
227                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
228         
229     }
230     
231     private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
232         
233         List<Short> tablesId = getTablesFromNode(targetNodeKey);
234         
235         if(tablesId.size() != 0){
236             for(Short id : tablesId){
237                 
238                 spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
239                 GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
240                         new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
241                 
242                 input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
243                 input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id));
244                 Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
245                         flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
246                 
247                 multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
248             }
249         }
250         
251         //Note: Just for testing, because i am not able to fetch table list from datastore
252         // Bug-225 is raised for investigation.
253         
254 //                spLogger.info("Send aggregate stats request for flow table {} to node {}",1,targetNodeKey);
255 //                GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = 
256 //                        new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
257 //                
258 //                input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
259 //                input.setTableId(new TableId((short)1));
260 //                Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response = 
261 //                        flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());`
262 //                
263 //                multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), (short)1);
264     }
265
266     private void sendAllPortStatisticsRequest(NodeRef targetNode){
267         
268         final GetAllPortsStatisticsInputBuilder input = new GetAllPortsStatisticsInputBuilder();
269         
270         input.setNode(targetNode);
271
272         @SuppressWarnings("unused")
273         Future<RpcResult<GetAllPortsStatisticsOutput>> response = 
274                 portStatsService.getAllPortsStatistics(input.build());
275     }
276
277     private void sendAllGroupStatisticsRequest(NodeRef targetNode){
278         
279         final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
280         
281         input.setNode(targetNode);
282
283         @SuppressWarnings("unused")
284         Future<RpcResult<GetAllGroupStatisticsOutput>> response = 
285                 groupStatsService.getAllGroupStatistics(input.build());
286     }
287     
288     private void sendGroupDescriptionRequest(NodeRef targetNode){
289         final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
290         
291         input.setNode(targetNode);
292
293         @SuppressWarnings("unused")
294         Future<RpcResult<GetGroupDescriptionOutput>> response = 
295                 groupStatsService.getGroupDescription(input.build());
296     }
297     
298     private void sendAllMeterStatisticsRequest(NodeRef targetNode){
299         
300         GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
301         
302         input.setNode(targetNode);
303
304         @SuppressWarnings("unused")
305         Future<RpcResult<GetAllMeterStatisticsOutput>> response = 
306                 meterStatsService.getAllMeterStatistics(input.build());
307     }
308     
309     private void sendMeterConfigStatisticsRequest(NodeRef targetNode){
310         
311         GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
312         
313         input.setNode(targetNode);
314
315         @SuppressWarnings("unused")
316         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response = 
317                 meterStatsService.getAllMeterConfigStatistics(input.build());
318         
319     }
320     
321     public ConcurrentMap<NodeId, NodeStatistics> getStatisticsCache() {
322         return statisticsCache;
323     }
324     
325     private List<Node> getAllConnectedNodes(){
326         
327         Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
328         if(nodes == null)
329             return null;
330         
331         spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
332         return nodes.getNode();
333     }
334     
335     private List<Short> getTablesFromNode(NodeKey nodeKey){
336         InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
337         
338         FlowCapableNode node = (FlowCapableNode)dps.readConfigurationData(nodesIdentifier);
339         List<Short> tablesId = new ArrayList<Short>();
340         if(node != null && node.getTable()!=null){
341             spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
342             for(Table table: node.getTable()){
343                 tablesId.add(table.getId());
344             }
345         }
346         return tablesId;
347     }
348
349     @SuppressWarnings("deprecation")
350     @Override
351     public void close(){
352         
353         try {
354             spLogger.info("Statistics Provider stopped.");
355             if (this.listenerRegistration != null) {
356               
357                 this.listenerRegistration.close();
358                 
359                 this.statisticsRequesterThread.destroy();
360             
361             }
362           } catch (Throwable e) {
363             throw Exceptions.sneakyThrow(e);
364           }
365     }
366
367 }