Merge "Bug 849: Fixed NPE in Translated Data Change Events."
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / FlowStatsTracker.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import java.util.Collection;
11 import java.util.Map.Entry;
12
13 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
14 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
15 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 final class FlowStatsTracker extends AbstractListeningStatsTracker<FlowAndStatisticsMapList, FlowStatsEntry> {
39     private static final Logger logger = LoggerFactory.getLogger(FlowStatsTracker.class);
40     private final OpendaylightFlowStatisticsService flowStatsService;
41     private FlowTableStatsTracker flowTableStats;
42     private int unaccountedFlowsCounter = 1;
43
44     FlowStatsTracker(final OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context) {
45         super(context);
46         this.flowStatsService = flowStatsService;
47     }
48     FlowStatsTracker(final OpendaylightFlowStatisticsService flowStatsService, final FlowCapableContext context, final FlowTableStatsTracker flowTableStats) {
49         this(flowStatsService, context);
50         this.flowTableStats = flowTableStats;
51     }
52
53     @Override
54     protected void cleanupSingleStat(final DataModificationTransaction trans, final FlowStatsEntry item) {
55         InstanceIdentifier<?> flowRef = getNodeIdentifierBuilder()
56                             .augmentation(FlowCapableNode.class)
57                             .child(Table.class, new TableKey(item.getTableId()))
58                             .child(Flow.class,item.getFlow().getKey())
59                             .augmentation(FlowStatisticsData.class).toInstance();
60         trans.removeOperationalData(flowRef);
61     }
62
63     @Override
64     protected FlowStatsEntry updateSingleStat(final DataModificationTransaction trans, final FlowAndStatisticsMapList map) {
65         short tableId = map.getTableId();
66
67         FlowBuilder flowBuilder = new FlowBuilder();
68
69         FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
70
71         FlowBuilder flow = new FlowBuilder();
72         flow.setContainerName(map.getContainerName());
73         flow.setBufferId(map.getBufferId());
74         flow.setCookie(map.getCookie());
75         flow.setCookieMask(map.getCookieMask());
76         flow.setFlags(map.getFlags());
77         flow.setFlowName(map.getFlowName());
78         flow.setHardTimeout(map.getHardTimeout());
79         if(map.getFlowId() != null)
80             flow.setId(new FlowId(map.getFlowId().getValue()));
81         flow.setIdleTimeout(map.getIdleTimeout());
82         flow.setInstallHw(map.isInstallHw());
83         flow.setInstructions(map.getInstructions());
84         if(map.getFlowId()!= null)
85             flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
86         flow.setMatch(map.getMatch());
87         flow.setOutGroup(map.getOutGroup());
88         flow.setOutPort(map.getOutPort());
89         flow.setPriority(map.getPriority());
90         flow.setStrict(map.isStrict());
91         flow.setTableId(tableId);
92
93         Flow flowRule = flow.build();
94
95         FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
96         stats.setByteCount(map.getByteCount());
97         stats.setPacketCount(map.getPacketCount());
98         stats.setDuration(map.getDuration());
99
100         GenericStatistics flowStats = stats.build();
101
102         //Augment the data to the flow node
103
104         FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
105         flowStatistics.setByteCount(flowStats.getByteCount());
106         flowStatistics.setPacketCount(flowStats.getPacketCount());
107         flowStatistics.setDuration(flowStats.getDuration());
108         flowStatistics.setContainerName(map.getContainerName());
109         flowStatistics.setBufferId(map.getBufferId());
110         flowStatistics.setCookie(map.getCookie());
111         flowStatistics.setCookieMask(map.getCookieMask());
112         flowStatistics.setFlags(map.getFlags());
113         flowStatistics.setFlowName(map.getFlowName());
114         flowStatistics.setHardTimeout(map.getHardTimeout());
115         flowStatistics.setIdleTimeout(map.getIdleTimeout());
116         flowStatistics.setInstallHw(map.isInstallHw());
117         flowStatistics.setInstructions(map.getInstructions());
118         flowStatistics.setMatch(map.getMatch());
119         flowStatistics.setOutGroup(map.getOutGroup());
120         flowStatistics.setOutPort(map.getOutPort());
121         flowStatistics.setPriority(map.getPriority());
122         flowStatistics.setStrict(map.isStrict());
123         flowStatistics.setTableId(tableId);
124
125         flowStatisticsData.setFlowStatistics(flowStatistics.build());
126
127         logger.debug("Flow : {}",flowRule.toString());
128         logger.debug("Statistics to augment : {}",flowStatistics.build().toString());
129
130         InstanceIdentifier<Table> tableRef = getNodeIdentifierBuilder()
131                 .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
132
133         //TODO: Not a good way to do it, need to figure out better way.
134         //TODO: major issue in any alternate approach is that flow key is incrementally assigned
135         //to the flows stored in data store.
136         // Augment same statistics to all the matching masked flow
137         Table table= (Table)trans.readConfigurationData(tableRef);
138         if(table != null){
139             for(Flow existingFlow : table.getFlow()){
140                 logger.debug("Existing flow in data store : {}",existingFlow.toString());
141                 if(FlowComparator.flowEquals(flowRule,existingFlow)){
142                     InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder()
143                             .augmentation(FlowCapableNode.class)
144                             .child(Table.class, new TableKey(tableId))
145                             .child(Flow.class,existingFlow.getKey()).toInstance();
146                     flowBuilder.setKey(existingFlow.getKey());
147                     flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
148                     logger.debug("Found matching flow in the datastore, augmenting statistics");
149                     // Update entry with timestamp of latest response
150                     flow.setKey(existingFlow.getKey());
151                     FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
152                     trans.putOperationalData(flowRef, flowBuilder.build());
153                     return flowStatsEntry;
154                 }
155             }
156         }
157
158         table = (Table)trans.readOperationalData(tableRef);
159         if(table != null){
160             for(Flow existingFlow : table.getFlow()){
161                 FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
162                 if(augmentedflowStatisticsData != null){
163                     FlowBuilder existingOperationalFlow = new FlowBuilder();
164                     existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
165                     logger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
166                     if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
167                         InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder()
168                                 .augmentation(FlowCapableNode.class)
169                                 .child(Table.class, new TableKey(tableId))
170                                 .child(Flow.class,existingFlow.getKey()).toInstance();
171                         flowBuilder.setKey(existingFlow.getKey());
172                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
173                         logger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
174                         // Update entry with timestamp of latest response
175                         flow.setKey(existingFlow.getKey());
176                         FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
177                         trans.putOperationalData(flowRef, flowBuilder.build());
178                         return flowStatsEntry;
179                     }
180                 }
181             }
182         }
183
184         String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
185         this.unaccountedFlowsCounter++;
186         FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
187         InstanceIdentifier<Flow> flowRef = getNodeIdentifierBuilder().augmentation(FlowCapableNode.class)
188                     .child(Table.class, new TableKey(tableId))
189                     .child(Flow.class,newFlowKey).toInstance();
190         flowBuilder.setKey(newFlowKey);
191         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
192         logger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",
193                     flowBuilder.build());
194
195         // Update entry with timestamp of latest response
196         flow.setKey(newFlowKey);
197         FlowStatsEntry flowStatsEntry = new FlowStatsEntry(tableId,flow.build());
198         trans.putOperationalData(flowRef, flowBuilder.build());
199         return flowStatsEntry;
200     }
201
202     @Override
203     protected InstanceIdentifier<?> listenPath() {
204         return getNodeIdentifierBuilder().augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class).build();
205     }
206
207     @Override
208     protected String statName() {
209         return "Flow";
210     }
211
212     @Override
213     public void request() {
214         // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
215         //        comes back -- we do not have any tables anyway.
216         final Collection<TableKey> tables = flowTableStats.getTables();
217         logger.debug("Node {} supports {} table(s)", this.getNodeRef(), tables.size());
218         for (final TableKey key : tables) {
219             logger.debug("Send aggregate stats request for flow table {} to node {}", key.getId(), this.getNodeRef());
220             this.requestAggregateFlows(key);
221         }
222
223         this.requestAllFlowsAllTables();
224
225     }
226     public void requestAllFlowsAllTables() {
227         if (flowStatsService != null) {
228             final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
229             input.setNode(getNodeRef());
230
231             requestHelper(flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build()));
232         }
233     }
234
235     public void requestAggregateFlows(final TableKey key) {
236         if (flowStatsService != null) {
237             GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
238                     new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
239
240             input.setNode(getNodeRef());
241             input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(key.getId()));
242             requestHelper(flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build()));
243         }
244     }
245
246     public void requestFlow(final Flow flow) {
247         if (flowStatsService != null) {
248             final GetFlowStatisticsFromFlowTableInputBuilder input =
249                     new GetFlowStatisticsFromFlowTableInputBuilder(flow);
250             input.setNode(getNodeRef());
251
252             requestHelper(flowStatsService.getFlowStatisticsFromFlowTable(input.build()));
253         }
254     }
255
256     @Override
257     public void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
258         for (Entry<InstanceIdentifier<?>, DataObject> e : change.getCreatedConfigurationData().entrySet()) {
259             if (Flow.class.equals(e.getKey().getTargetType())) {
260                 final Flow flow = (Flow) e.getValue();
261                 logger.debug("Key {} triggered request for flow {}", e.getKey(), flow);
262                 requestFlow(flow);
263             } else {
264                 logger.debug("Ignoring key {}", e.getKey());
265             }
266         }
267
268         final DataModificationTransaction trans = startTransaction();
269         for (InstanceIdentifier<?> key : change.getRemovedConfigurationData()) {
270             if (Flow.class.equals(key.getTargetType())) {
271                 @SuppressWarnings("unchecked")
272                 final InstanceIdentifier<Flow> flow = (InstanceIdentifier<Flow>)key;
273                 logger.debug("Key {} triggered remove of Flow from operational space.", key);
274                 trans.removeOperationalData(flow);
275             }
276         }
277         trans.commit();
278     }
279
280     @Override
281     public void start(final DataBrokerService dbs) {
282         if (flowStatsService == null) {
283             logger.debug("No Flow Statistics service, not subscribing to flows on node {}", getNodeIdentifier());
284             return;
285         }
286
287         super.start(dbs);
288     }
289 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.