2 * Copyright IBM Corporation, 2013. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.statistics.manager;
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Future;
17 import org.eclipse.xtext.xbase.lib.Exceptions;
18 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
19 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
20 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllPortsStatisticsInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllPortsStatisticsOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
49 import org.opendaylight.yangtools.concepts.Registration;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.opendaylight.yangtools.yang.binding.NotificationListener;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
56 public class StatisticsProvider implements AutoCloseable {
58 public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
60 private DataProviderService dps;
62 private NotificationProviderService nps;
64 private OpendaylightGroupStatisticsService groupStatsService;
66 private OpendaylightMeterStatisticsService meterStatsService;
68 private OpendaylightFlowStatisticsService flowStatsService;
70 private OpendaylightPortStatisticsService portStatsService;
72 private OpendaylightFlowTableStatisticsService flowTableStatsService;
74 private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
76 private Thread statisticsRequesterThread;
78 private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
80 private final int STATS_THREAD_EXECUTION_TIME= 50000;
81 //Local caching of stats
83 private final ConcurrentMap<NodeId,NodeStatistics> statisticsCache =
84 new ConcurrentHashMap<NodeId,NodeStatistics>();
86 public DataProviderService getDataService() {
90 public void setDataService(final DataProviderService dataService) {
91 this.dps = dataService;
94 public NotificationProviderService getNotificationService() {
98 public void setNotificationService(final NotificationProviderService notificationService) {
99 this.nps = notificationService;
102 public MultipartMessageManager getMultipartMessageManager() {
103 return multipartMessageManager;
106 private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
108 private Registration<NotificationListener> listenerRegistration;
110 public void start() {
112 NotificationProviderService nps = this.getNotificationService();
113 Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
114 this.listenerRegistration = registerNotificationListener;
116 // Get Group/Meter statistics service instance
117 groupStatsService = StatisticsManagerActivator.getProviderContext().
118 getRpcService(OpendaylightGroupStatisticsService.class);
120 meterStatsService = StatisticsManagerActivator.getProviderContext().
121 getRpcService(OpendaylightMeterStatisticsService.class);
123 flowStatsService = StatisticsManagerActivator.getProviderContext().
124 getRpcService(OpendaylightFlowStatisticsService.class);
126 portStatsService = StatisticsManagerActivator.getProviderContext().
127 getRpcService(OpendaylightPortStatisticsService.class);
129 flowTableStatsService = StatisticsManagerActivator.getProviderContext().
130 getRpcService(OpendaylightFlowTableStatisticsService.class);
132 statisticsRequesterThread = new Thread( new Runnable(){
138 statsRequestSender();
140 Thread.sleep(STATS_THREAD_EXECUTION_TIME);
141 }catch (Exception e){
142 spLogger.error("Exception occurred while sending stats request : {}",e);
148 spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
150 statisticsRequesterThread.start();
152 spLogger.info("Statistics Provider started.");
155 protected DataModificationTransaction startChange() {
157 DataProviderService dps = this.getDataService();
158 return dps.beginTransaction();
161 private void statsRequestSender(){
163 List<Node> targetNodes = getAllConnectedNodes();
165 if(targetNodes == null)
169 for (Node targetNode : targetNodes){
171 InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
172 NodeRef targetNodeRef = new NodeRef(targetInstanceId);
176 sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
178 sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
180 sendAllPortStatisticsRequest(targetNodeRef);
182 sendAllFlowTablesStatisticsRequest(targetNodeRef);
185 spLogger.error("Exception occured while sending statistics requests : {}",e);
188 if(targetNode.getAugmentation(FlowCapableNode.class) != null){
190 spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
193 sendAllGroupStatisticsRequest(targetNodeRef);
195 sendAllMeterStatisticsRequest(targetNodeRef);
197 sendGroupDescriptionRequest(targetNodeRef);
199 sendMeterConfigStatisticsRequest(targetNodeRef);
202 spLogger.error("Exception occured while sending statistics requests : {}", e);
208 private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) {
209 final GetFlowTablesStatisticsInputBuilder input =
210 new GetFlowTablesStatisticsInputBuilder();
212 input.setNode(targetNodeRef);
214 @SuppressWarnings("unused")
215 Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
216 flowTableStatsService.getFlowTablesStatistics(input.build());
219 private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode){
220 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
221 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
223 input.setNode(targetNode);
225 @SuppressWarnings("unused")
226 Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
227 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
231 private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
233 List<Short> tablesId = getTablesFromNode(targetNodeKey);
235 if(tablesId.size() != 0){
236 for(Short id : tablesId){
238 spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
239 GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
240 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
242 input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
243 input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(id));
244 Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
245 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
247 multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
251 //Note: Just for testing, because i am not able to fetch table list from datastore
252 // Bug-225 is raised for investigation.
254 // spLogger.info("Send aggregate stats request for flow table {} to node {}",1,targetNodeKey);
255 // GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
256 // new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
258 // input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
259 // input.setTableId(new TableId((short)1));
260 // Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
261 // flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());`
263 // multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), (short)1);
266 private void sendAllPortStatisticsRequest(NodeRef targetNode){
268 final GetAllPortsStatisticsInputBuilder input = new GetAllPortsStatisticsInputBuilder();
270 input.setNode(targetNode);
272 @SuppressWarnings("unused")
273 Future<RpcResult<GetAllPortsStatisticsOutput>> response =
274 portStatsService.getAllPortsStatistics(input.build());
277 private void sendAllGroupStatisticsRequest(NodeRef targetNode){
279 final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
281 input.setNode(targetNode);
283 @SuppressWarnings("unused")
284 Future<RpcResult<GetAllGroupStatisticsOutput>> response =
285 groupStatsService.getAllGroupStatistics(input.build());
288 private void sendGroupDescriptionRequest(NodeRef targetNode){
289 final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
291 input.setNode(targetNode);
293 @SuppressWarnings("unused")
294 Future<RpcResult<GetGroupDescriptionOutput>> response =
295 groupStatsService.getGroupDescription(input.build());
298 private void sendAllMeterStatisticsRequest(NodeRef targetNode){
300 GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
302 input.setNode(targetNode);
304 @SuppressWarnings("unused")
305 Future<RpcResult<GetAllMeterStatisticsOutput>> response =
306 meterStatsService.getAllMeterStatistics(input.build());
309 private void sendMeterConfigStatisticsRequest(NodeRef targetNode){
311 GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
313 input.setNode(targetNode);
315 @SuppressWarnings("unused")
316 Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
317 meterStatsService.getAllMeterConfigStatistics(input.build());
321 public ConcurrentMap<NodeId, NodeStatistics> getStatisticsCache() {
322 return statisticsCache;
325 private List<Node> getAllConnectedNodes(){
327 Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
331 spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
332 return nodes.getNode();
335 private List<Short> getTablesFromNode(NodeKey nodeKey){
336 InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
338 FlowCapableNode node = (FlowCapableNode)dps.readConfigurationData(nodesIdentifier);
339 List<Short> tablesId = new ArrayList<Short>();
340 if(node != null && node.getTable()!=null){
341 spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
342 for(Table table: node.getTable()){
343 tablesId.add(table.getId());
349 @SuppressWarnings("deprecation")
354 spLogger.info("Statistics Provider stopped.");
355 if (this.listenerRegistration != null) {
357 this.listenerRegistration.close();
359 this.statisticsRequesterThread.destroy();
362 } catch (Throwable e) {
363 throw Exceptions.sneakyThrow(e);