2 * Copyright IBM Corporation, 2013. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.statistics.manager;
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.Future;
17 import org.eclipse.xtext.xbase.lib.Exceptions;
18 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
19 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
20 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.TableId;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
44 import org.opendaylight.yangtools.concepts.Registration;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.binding.NotificationListener;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 public class StatisticsProvider implements AutoCloseable {
53 public final static Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
55 private DataProviderService dps;
57 private NotificationProviderService nps;
59 private OpendaylightGroupStatisticsService groupStatsService;
61 private OpendaylightMeterStatisticsService meterStatsService;
63 private OpendaylightFlowStatisticsService flowStatsService;
65 private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
67 private Thread statisticsRequesterThread;
69 private final InstanceIdentifier<Nodes> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).toInstance();
71 private final int STATS_THREAD_EXECUTION_TIME= 50000;
72 //Local caching of stats
74 private final ConcurrentMap<NodeId,NodeStatistics> statisticsCache =
75 new ConcurrentHashMap<NodeId,NodeStatistics>();
77 public DataProviderService getDataService() {
81 public void setDataService(final DataProviderService dataService) {
82 this.dps = dataService;
85 public NotificationProviderService getNotificationService() {
89 public void setNotificationService(final NotificationProviderService notificationService) {
90 this.nps = notificationService;
93 public MultipartMessageManager getMultipartMessageManager() {
94 return multipartMessageManager;
97 private final StatisticsUpdateCommiter updateCommiter = new StatisticsUpdateCommiter(StatisticsProvider.this);
99 private Registration<NotificationListener> listenerRegistration;
101 public void start() {
103 NotificationProviderService nps = this.getNotificationService();
104 Registration<NotificationListener> registerNotificationListener = nps.registerNotificationListener(this.updateCommiter);
105 this.listenerRegistration = registerNotificationListener;
107 // Get Group/Meter statistics service instance
108 groupStatsService = StatisticsManagerActivator.getProviderContext().
109 getRpcService(OpendaylightGroupStatisticsService.class);
111 meterStatsService = StatisticsManagerActivator.getProviderContext().
112 getRpcService(OpendaylightMeterStatisticsService.class);
114 flowStatsService = StatisticsManagerActivator.getProviderContext().
115 getRpcService(OpendaylightFlowStatisticsService.class);
117 statisticsRequesterThread = new Thread( new Runnable(){
123 statsRequestSender();
125 Thread.sleep(STATS_THREAD_EXECUTION_TIME);
126 }catch (Exception e){
127 spLogger.error("Exception occurred while sending stats request : {}",e);
133 spLogger.debug("Statistics requester thread started with timer interval : {}",STATS_THREAD_EXECUTION_TIME);
135 statisticsRequesterThread.start();
137 spLogger.info("Statistics Provider started.");
140 protected DataModificationTransaction startChange() {
142 DataProviderService dps = this.getDataService();
143 return dps.beginTransaction();
146 private void statsRequestSender(){
148 //Need to call API to receive all the nodes connected to controller.
149 List<Node> targetNodes = getAllConnectedNodes();
151 if(targetNodes == null)
154 for (Node targetNode : targetNodes){
156 InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class,targetNode.getKey()).toInstance();
157 NodeRef targetNodeRef = new NodeRef(targetInstanceId);
161 sendAggregateFlowsStatsFromAllTablesRequest(targetNode.getKey());
163 sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
166 spLogger.error("Exception occured while sending flow statistics request : {}",e);
169 if(targetNode.getAugmentation(FlowCapableNode.class) != null){
171 spLogger.info("Send request for stats collection to node : {})",targetNode.getId());
174 sendAllGroupStatisticsRequest(targetNodeRef);
176 sendAllMeterStatisticsRequest(targetNodeRef);
178 sendGroupDescriptionRequest(targetNodeRef);
180 sendMeterConfigStatisticsRequest(targetNodeRef);
183 spLogger.error("Exception occured while sending statistics request : {}", e);
189 private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode){
190 final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
191 new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
193 input.setNode(targetNode);
195 @SuppressWarnings("unused")
196 Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
197 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
201 private void sendAggregateFlowsStatsFromAllTablesRequest(NodeKey targetNodeKey) throws InterruptedException, ExecutionException{
203 List<Short> tablesId = getTablesFromNode(targetNodeKey);
205 if(tablesId.size() != 0){
206 for(Short id : tablesId){
208 spLogger.info("Send aggregate stats request for flow table {} to node {}",id,targetNodeKey);
209 GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
210 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
212 input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
213 input.setTableId(new TableId(id));
214 Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
215 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
217 multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), id);
221 //Note: Just for testing, because i am not able to fetch table list from datastore
222 // Bug-225 is raised for investigation.
224 // spLogger.info("Send aggregate stats request for flow table {} to node {}",1,targetNodeKey);
225 // GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
226 // new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
228 // input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
229 // input.setTableId(new TableId((short)1));
230 // Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
231 // flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());`
233 // multipartMessageManager.setTxIdAndTableIdMapEntry(response.get().getResult().getTransactionId(), (short)1);
236 private void sendAllGroupStatisticsRequest(NodeRef targetNode){
238 final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
240 input.setNode(targetNode);
242 @SuppressWarnings("unused")
243 Future<RpcResult<GetAllGroupStatisticsOutput>> response =
244 groupStatsService.getAllGroupStatistics(input.build());
247 private void sendGroupDescriptionRequest(NodeRef targetNode){
248 final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
250 input.setNode(targetNode);
252 @SuppressWarnings("unused")
253 Future<RpcResult<GetGroupDescriptionOutput>> response =
254 groupStatsService.getGroupDescription(input.build());
257 private void sendAllMeterStatisticsRequest(NodeRef targetNode){
259 GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
261 input.setNode(targetNode);
263 @SuppressWarnings("unused")
264 Future<RpcResult<GetAllMeterStatisticsOutput>> response =
265 meterStatsService.getAllMeterStatistics(input.build());
268 private void sendMeterConfigStatisticsRequest(NodeRef targetNode){
270 GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
272 input.setNode(targetNode);
274 @SuppressWarnings("unused")
275 Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
276 meterStatsService.getAllMeterConfigStatistics(input.build());
280 public ConcurrentMap<NodeId, NodeStatistics> getStatisticsCache() {
281 return statisticsCache;
284 private List<Node> getAllConnectedNodes(){
286 Nodes nodes = (Nodes) dps.readOperationalData(nodesIdentifier);
290 spLogger.info("Number of connected nodes : {}",nodes.getNode().size());
291 return nodes.getNode();
294 private List<Short> getTablesFromNode(NodeKey nodeKey){
295 InstanceIdentifier<FlowCapableNode> nodesIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class,nodeKey).augmentation(FlowCapableNode.class).toInstance();
297 FlowCapableNode node = (FlowCapableNode)dps.readConfigurationData(nodesIdentifier);
298 List<Short> tablesId = new ArrayList<Short>();
299 if(node != null && node.getTable()!=null){
300 spLogger.info("Number of tables {} supported by node {}",node.getTable().size(),nodeKey);
301 for(Table table: node.getTable()){
302 tablesId.add(table.getId());
308 @SuppressWarnings("deprecation")
313 spLogger.info("Statistics Provider stopped.");
314 if (this.listenerRegistration != null) {
316 this.listenerRegistration.close();
318 this.statisticsRequesterThread.destroy();
321 } catch (Throwable e) {
322 throw Exceptions.sneakyThrow(e);