2 * Copyright IBM Corporation, 2013. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.statistics.manager;
10 import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.FlowEntry;
11 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
12 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
13 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
14 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Class implement statistics manager related listener interface and augment all the
51 * received statistics data to data stores.
52 * TODO: Need to add error message listener and clean-up the associated tx id
53 * if it exists in the tx-id cache.
57 public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener,
58 OpendaylightMeterStatisticsListener,
59 OpendaylightFlowStatisticsListener,
60 OpendaylightPortStatisticsListener,
61 OpendaylightFlowTableStatisticsListener,
62 OpendaylightQueueStatisticsListener{
64 private final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
66 private final StatisticsProvider statisticsManager;
67 private final MultipartMessageManager messageManager;
69 private int unaccountedFlowsCounter = 1;
75 public StatisticsUpdateCommiter(final StatisticsProvider manager){
76 this.statisticsManager = manager;
77 this.messageManager = this.statisticsManager.getMultipartMessageManager();
81 public void onMeterConfigStatsUpdated(final MeterConfigStatsUpdated notification) {
82 //Check if response is for the request statistics-manager sent.
83 if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
86 //Add statistics to local cache
87 final NodeStatisticsAger sna = this.statisticsManager.getStatisticsHandler(notification.getId());
89 sna.updateMeterConfigStats(notification.getMeterConfigStats());
94 public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
95 //Check if response is for the request statistics-manager sent.
96 if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
99 //Add statistics to local cache
100 final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
102 nsa.updateMeterStats(notification.getMeterStats());
107 public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
108 //Check if response is for the request statistics-manager sent.
109 if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
112 final NodeStatisticsAger nsa = statisticsManager.getStatisticsHandler(notification.getId());
114 nsa.updateGroupDescStats(notification.getGroupDescStats());
119 public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
120 //Check if response is for the request statistics-manager sent.
121 if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
124 final NodeStatisticsAger nsa = statisticsManager.getStatisticsHandler(notification.getId());
126 nsa.updateGroupStats(notification.getGroupStats());
131 public void onMeterFeaturesUpdated(MeterFeaturesUpdated notification) {
132 final NodeStatisticsAger sna = this.statisticsManager.getStatisticsHandler(notification.getId());
134 sna.updateMeterFeatures(notification);
139 public void onGroupFeaturesUpdated(GroupFeaturesUpdated notification) {
140 final NodeStatisticsAger sna = this.statisticsManager.getStatisticsHandler(notification.getId());
142 sna.updateGroupFeatures(notification);
147 public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
148 //Check if response is for the request statistics-manager sent.
149 if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
152 sucLogger.debug("Received flow stats update : {}",notification.toString());
154 final NodeKey key = new NodeKey(notification.getId());
155 final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(key.getId());
156 DataModificationTransaction it = this.statisticsManager.startChange();
158 for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){
159 short tableId = map.getTableId();
162 boolean foundOriginalFlow = false;
164 FlowBuilder flowBuilder = new FlowBuilder();
166 FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
168 FlowBuilder flow = new FlowBuilder();
169 flow.setContainerName(map.getContainerName());
170 flow.setBufferId(map.getBufferId());
171 flow.setCookie(map.getCookie());
172 flow.setCookieMask(map.getCookieMask());
173 flow.setFlags(map.getFlags());
174 flow.setFlowName(map.getFlowName());
175 flow.setHardTimeout(map.getHardTimeout());
176 if(map.getFlowId() != null)
177 flow.setId(new FlowId(map.getFlowId().getValue()));
178 flow.setIdleTimeout(map.getIdleTimeout());
179 flow.setInstallHw(map.isInstallHw());
180 flow.setInstructions(map.getInstructions());
181 if(map.getFlowId()!= null)
182 flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
183 flow.setMatch(map.getMatch());
184 flow.setOutGroup(map.getOutGroup());
185 flow.setOutPort(map.getOutPort());
186 flow.setPriority(map.getPriority());
187 flow.setStrict(map.isStrict());
188 flow.setTableId(tableId);
190 Flow flowRule = flow.build();
192 FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
193 stats.setByteCount(map.getByteCount());
194 stats.setPacketCount(map.getPacketCount());
195 stats.setDuration(map.getDuration());
197 GenericStatistics flowStats = stats.build();
199 //Augment the data to the flow node
201 FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
202 flowStatistics.setByteCount(flowStats.getByteCount());
203 flowStatistics.setPacketCount(flowStats.getPacketCount());
204 flowStatistics.setDuration(flowStats.getDuration());
205 flowStatistics.setContainerName(map.getContainerName());
206 flowStatistics.setBufferId(map.getBufferId());
207 flowStatistics.setCookie(map.getCookie());
208 flowStatistics.setCookieMask(map.getCookieMask());
209 flowStatistics.setFlags(map.getFlags());
210 flowStatistics.setFlowName(map.getFlowName());
211 flowStatistics.setHardTimeout(map.getHardTimeout());
212 flowStatistics.setIdleTimeout(map.getIdleTimeout());
213 flowStatistics.setInstallHw(map.isInstallHw());
214 flowStatistics.setInstructions(map.getInstructions());
215 flowStatistics.setMatch(map.getMatch());
216 flowStatistics.setOutGroup(map.getOutGroup());
217 flowStatistics.setOutPort(map.getOutPort());
218 flowStatistics.setPriority(map.getPriority());
219 flowStatistics.setStrict(map.isStrict());
220 flowStatistics.setTableId(tableId);
222 flowStatisticsData.setFlowStatistics(flowStatistics.build());
224 sucLogger.debug("Flow : {}",flowRule.toString());
225 sucLogger.debug("Statistics to augment : {}",flowStatistics.build().toString());
227 InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
228 .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
230 Table table= (Table)it.readConfigurationData(tableRef);
232 //TODO: Not a good way to do it, need to figure out better way.
233 //TODO: major issue in any alternate approach is that flow key is incrementally assigned
234 //to the flows stored in data store.
235 // Augment same statistics to all the matching masked flow
238 for(Flow existingFlow : table.getFlow()){
239 sucLogger.debug("Existing flow in data store : {}",existingFlow.toString());
240 if(FlowComparator.flowEquals(flowRule,existingFlow)){
241 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
242 .augmentation(FlowCapableNode.class)
243 .child(Table.class, new TableKey(tableId))
244 .child(Flow.class,existingFlow.getKey()).toInstance();
245 flowBuilder.setKey(existingFlow.getKey());
246 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
247 sucLogger.debug("Found matching flow in the datastore, augmenting statistics");
248 foundOriginalFlow = true;
249 // Update entry with timestamp of latest response
250 flow.setKey(existingFlow.getKey());
251 FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
252 nsa.updateFlowStats(flowStatsEntry);
254 it.putOperationalData(flowRef, flowBuilder.build());
259 table= (Table)it.readOperationalData(tableRef);
260 if(!foundOriginalFlow && table != null){
262 for(Flow existingFlow : table.getFlow()){
263 FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
264 if(augmentedflowStatisticsData != null){
265 FlowBuilder existingOperationalFlow = new FlowBuilder();
266 existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
267 sucLogger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
268 if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
269 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
270 .augmentation(FlowCapableNode.class)
271 .child(Table.class, new TableKey(tableId))
272 .child(Flow.class,existingFlow.getKey()).toInstance();
273 flowBuilder.setKey(existingFlow.getKey());
274 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
275 sucLogger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
276 foundOriginalFlow = true;
278 // Update entry with timestamp of latest response
279 flow.setKey(existingFlow.getKey());
280 FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
281 nsa.updateFlowStats(flowStatsEntry);
283 it.putOperationalData(flowRef, flowBuilder.build());
289 if(!foundOriginalFlow){
290 String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
291 this.unaccountedFlowsCounter++;
292 FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
293 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
294 .augmentation(FlowCapableNode.class)
295 .child(Table.class, new TableKey(tableId))
296 .child(Flow.class,newFlowKey).toInstance();
297 flowBuilder.setKey(newFlowKey);
298 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
299 sucLogger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",flowBuilder.build());
301 // Update entry with timestamp of latest response
302 flow.setKey(newFlowKey);
303 FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
304 nsa.updateFlowStats(flowStatsEntry);
306 it.putOperationalData(flowRef, flowBuilder.build());
313 public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
314 //Check if response is for the request statistics-manager sent.
315 if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
318 final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
320 final Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId());
321 nsa.updateAggregateFlowStats(tableId, notification);
326 public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
327 //Check if response is for the request statistics-manager sent.
328 if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
331 final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
333 nsa.updateNodeConnectorStats(notification.getNodeConnectorStatisticsAndPortNumberMap());
338 public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
339 //Check if response is for the request statistics-manager sent.
340 if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
343 final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
345 nsa.updateFlowTableStats(notification.getFlowTableAndStatisticsMap());
350 public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
351 //Check if response is for the request statistics-manager sent.
352 if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
355 //Add statistics to local cache
356 final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
358 nsa.updateQueueStats(notification.getQueueIdAndStatisticsMap());