Merge "Move queue/meter/flow listeners into their trackers"
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / FlowStatsTracker.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.Map.Entry;
11
12 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
13 import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
14 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
15 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatisticsMapList, FlowStatsEntry> {
39     private static final Logger logger = LoggerFactory.getLogger(FlowStatsTracker.class);
40     private final OpendaylightFlowStatisticsService flowStatsService;
41     private int unaccountedFlowsCounter = 1;
42
43     FlowStatsTracker(OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, long lifetimeNanos) {
44         super(context, lifetimeNanos);
45         this.flowStatsService = flowStatsService;
46     }
47
48     @Override
49     protected void cleanupSingleStat(DataModificationTransaction trans, FlowStatsEntry item) {
50         InstanceIdentifier<?> flowRef = getNodeIdentifierBuilder()
51                             .augmentation(FlowCapableNode.class)
52                             .child(Table.class, new TableKey(item.getTableId()))
53                             .child(Flow.class,item.getFlow().getKey())
54                             .augmentation(FlowStatisticsData.class).toInstance();
55         trans.removeOperationalData(flowRef);
56     }
57
58     @Override
59     protected FlowStatsEntry updateSingleStat(DataModificationTransaction trans, FlowAndStatisticsMapList map) {
60         short tableId = map.getTableId();
61
62         FlowBuilder flowBuilder = new FlowBuilder();
63
64         FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
65
66         FlowBuilder flow = new FlowBuilder();
67         flow.setContainerName(map.getContainerName());
68         flow.setBufferId(map.getBufferId());
69         flow.setCookie(map.getCookie());
70         flow.setCookieMask(map.getCookieMask());
71         flow.setFlags(map.getFlags());
72         flow.setFlowName(map.getFlowName());
73         flow.setHardTimeout(map.getHardTimeout());
74         if(map.getFlowId() != null)
75             flow.setId(new FlowId(map.getFlowId().getValue()));
76         flow.setIdleTimeout(map.getIdleTimeout());
77         flow.setInstallHw(map.isInstallHw());
78         flow.setInstructions(map.getInstructions());
79         if(map.getFlowId()!= null)
80             flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
81         flow.setMatch(map.getMatch());
82         flow.setOutGroup(map.getOutGroup());
83         flow.setOutPort(map.getOutPort());
84         flow.setPriority(map.getPriority());
85         flow.setStrict(map.isStrict());
86         flow.setTableId(tableId);
87
88         Flow flowRule = flow.build();
89
90         FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
91         stats.setByteCount(map.getByteCount());
92         stats.setPacketCount(map.getPacketCount());
93         stats.setDuration(map.getDuration());
94
95         GenericStatistics flowStats = stats.build();
96
97         //Augment the data to the flow node
98
99         FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
100         flowStatistics.setByteCount(flowStats.getByteCount());
101         flowStatistics.setPacketCount(flowStats.getPacketCount());
102         flowStatistics.setDuration(flowStats.getDuration());
103         flowStatistics.setContainerName(map.getContainerName());
104         flowStatistics.setBufferId(map.getBufferId());
105         flowStatistics.setCookie(map.getCookie());
106         flowStatistics.setCookieMask(map.getCookieMask());
107         flowStatistics.setFlags(map.getFlags());
108         flowStatistics.setFlowName(map.getFlowName());
109         flowStatistics.setHardTimeout(map.getHardTimeout());
110         flowStatistics.setIdleTimeout(map.getIdleTimeout());
111         flowStatistics.setInstallHw(map.isInstallHw());
112         flowStatistics.setInstructions(map.getInstructions());
113         flowStatistics.setMatch(map.getMatch());
114         flowStatistics.setOutGroup(map.getOutGroup());
115         flowStatistics.setOutPort(map.getOutPort());
116         flowStatistics.setPriority(map.getPriority());
117         flowStatistics.setStrict(map.isStrict());
118         flowStatistics.setTableId(tableId);
119
120         flowStatisticsData.setFlowStatistics(flowStatistics.build());
121
122         logger.debug("Flow : {}",flowRule.toString());
123         logger.debug("Statistics to augment : {}",flowStatistics.build().toString());
124
125         InstanceIdentifier<Table> tableRef = getNodeIdentifierBuilder()
126                 .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
127
128         //TODO: Not a good way to do it, need to figure out better way.
129         //TODO: major issue in any alternate approach is that flow key is incrementally assigned
130         //to the flows stored in data store.
131         // Augment same statistics to all the matching masked flow
132         Table table= (Table)trans.readConfigurationData(tableRef);
133         if(table != null){
134             for(Flow existingFlow : table.getFlow()){
135                 logger.debug("Existing flow in data store : {}",existingFlow.toString());
136                 if(FlowComparator.flowEquals(flowRule,existingFlow)){
137                     InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder()
138                             .augmentation(FlowCapableNode.class)
139                             .child(Table.class, new TableKey(tableId))
140                             .child(Flow.class,existingFlow.getKey()).toInstance();
141                     flowBuilder.setKey(existingFlow.getKey());
142                     flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
143                     logger.debug("Found matching flow in the datastore, augmenting statistics");
144                     // Update entry with timestamp of latest response
145                     flow.setKey(existingFlow.getKey());
146                     FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
147                     trans.putOperationalData(flowRef, flowBuilder.build());
148                     return flowStatsEntry;
149                 }
150             }
151         }
152
153         table = (Table)trans.readOperationalData(tableRef);
154         if(table != null){
155             for(Flow existingFlow : table.getFlow()){
156                 FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
157                 if(augmentedflowStatisticsData != null){
158                     FlowBuilder existingOperationalFlow = new FlowBuilder();
159                     existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
160                     logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
161                     if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
162                         InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder()
163                                 .augmentation(FlowCapableNode.class)
164                                 .child(Table.class, new TableKey(tableId))
165                                 .child(Flow.class,existingFlow.getKey()).toInstance();
166                         flowBuilder.setKey(existingFlow.getKey());
167                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
168                         logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
169                         // Update entry with timestamp of latest response
170                         flow.setKey(existingFlow.getKey());
171                         FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
172                         trans.putOperationalData(flowRef, flowBuilder.build());
173                         return flowStatsEntry;
174                     }
175                 }
176             }
177         }
178
179         String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
180         this.unaccountedFlowsCounter++;
181         FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
182         InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder().augmentation(FlowCapableNode.class)
183                     .child(Table.class, new TableKey(tableId))
184                     .child(Flow.class,newFlowKey).toInstance();
185         flowBuilder.setKey(newFlowKey);
186         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
187         logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",
188                     flowBuilder.build());
189
190         // Update entry with timestamp of latest response
191         flow.setKey(newFlowKey);
192         FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
193         trans.putOperationalData(flowRef, flowBuilder.build());
194         return flowStatsEntry;
195     }
196
197     @Override
198     protected InstanceIdentifier<?> listenPath() {
199         return getNodeIdentifierBuilder().augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class).build();
200     }
201
202     @Override
203     protected String statName() {
204         return "Flow";
205     }
206
207     public void requestAllFlowsAllTables() {
208         if (flowStatsService != null) {
209             final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
210             input.setNode(getNodeRef());
211
212             requestHelper(flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build()), StatsRequestType.ALL_FLOW);
213         }
214     }
215
216     public void requestAggregateFlows(final TableKey key) {
217         if (flowStatsService != null) {
218             GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
219                     new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
220
221             input.setNode(getNodeRef());
222             input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(key.getId()));
223             requestHelper(flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build()), StatsRequestType.ALL_FLOW);
224         }
225     }
226
227     public void requestFlow(final Flow flow) {
228         if (flowStatsService != null) {
229             final GetFlowStatisticsFromFlowTableInputBuilder input =
230                     new GetFlowStatisticsFromFlowTableInputBuilder(flow);
231             input.setNode(getNodeRef());
232
233             requestHelper(flowStatsService.getFlowStatisticsFromFlowTable(input.build()), StatsRequestType.ALL_FLOW);
234         }
235     }
236
237     @Override
238     public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
239         for (Entry<InstanceIdentifier<?>, DataObject> e : change.getCreatedConfigurationData().entrySet()) {
240             if (Flow.class.equals(e.getKey().getTargetType())) {
241                 final Flow flow = (Flow) e.getValue();
242                 logger.debug("Key {} triggered request for flow {}", e.getKey(), flow);
243                 requestFlow(flow);
244             } else {
245                 logger.debug("Ignoring key {}", e.getKey());
246             }
247         }
248
249         final DataModificationTransaction trans = startTransaction();
250         for (InstanceIdentifier<?> key : change.getRemovedConfigurationData()) {
251             if (Flow.class.equals(key.getTargetType())) {
252                 @SuppressWarnings("unchecked")
253                 final InstanceIdentifier<Flow> flow = (InstanceIdentifier<Flow>)key;
254                 final InstanceIdentifier<?> del = InstanceIdentifier.builder(flow)
255                         .augmentation(FlowStatisticsData.class).build();
256                 logger.debug("Key {} triggered remove of augmentation {}", key, del);
257
258                 trans.removeOperationalData(del);
259             }
260         }
261         trans.commit();
262     }
263
264     @Override
265     public void start(final DataBrokerService dbs) {
266         if (flowStatsService == null) {
267             logger.debug("No Flow Statistics service, not subscribing to flows on node {}", getNodeIdentifier());
268             return;
269         }
270
271         super.start(dbs);
272     }
273 }