Move flow comparison methods into utility class
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsUpdateCommiter.java
1 /*
2  * Copyright IBM Corporation, 2013.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.statistics.manager;
9
10 import org.opendaylight.controller.md.statistics.manager.NodeStatisticsAger.FlowEntry;
11 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
12 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
13 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
14 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterStatisticsUpdated;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Class implement statistics manager related listener interface and augment all the
51  * received statistics data to data stores.
52  * TODO: Need to add error message listener and clean-up the associated tx id
53  * if it exists in the tx-id cache.
54  * @author vishnoianil
55  *
56  */
57 public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsListener,
58         OpendaylightMeterStatisticsListener,
59         OpendaylightFlowStatisticsListener,
60         OpendaylightPortStatisticsListener,
61         OpendaylightFlowTableStatisticsListener,
62         OpendaylightQueueStatisticsListener{
63
64     private final static Logger sucLogger = LoggerFactory.getLogger(StatisticsUpdateCommiter.class);
65
66     private final StatisticsProvider statisticsManager;
67     private final MultipartMessageManager messageManager;
68
69     private int unaccountedFlowsCounter = 1;
70
71     /**
72      * default ctor
73      * @param manager
74      */
75     public StatisticsUpdateCommiter(final StatisticsProvider manager){
76         this.statisticsManager = manager;
77         this.messageManager = this.statisticsManager.getMultipartMessageManager();
78     }
79
80     @Override
81     public void onMeterConfigStatsUpdated(final MeterConfigStatsUpdated notification) {
82         //Check if response is for the request statistics-manager sent.
83         if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
84             return;
85
86         //Add statistics to local cache
87         final NodeStatisticsAger sna = this.statisticsManager.getStatisticsHandler(notification.getId());
88         if (sna != null) {
89             sna.updateMeterConfigStats(notification.getMeterConfigStats());
90         }
91     }
92
93     @Override
94     public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
95         //Check if response is for the request statistics-manager sent.
96         if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
97             return;
98
99         //Add statistics to local cache
100         final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
101         if (nsa != null) {
102             nsa.updateMeterStats(notification.getMeterStats());
103         }
104     }
105
106     @Override
107     public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
108         //Check if response is for the request statistics-manager sent.
109         if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
110             return;
111
112         final NodeStatisticsAger nsa = statisticsManager.getStatisticsHandler(notification.getId());
113         if (nsa != null) {
114             nsa.updateGroupDescStats(notification.getGroupDescStats());
115         }
116     }
117
118     @Override
119     public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
120         //Check if response is for the request statistics-manager sent.
121         if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
122             return;
123
124         final NodeStatisticsAger nsa = statisticsManager.getStatisticsHandler(notification.getId());
125         if (nsa != null) {
126             nsa.updateGroupStats(notification.getGroupStats());
127         }
128     }
129
130     @Override
131     public void onMeterFeaturesUpdated(MeterFeaturesUpdated notification) {
132         final NodeStatisticsAger sna = this.statisticsManager.getStatisticsHandler(notification.getId());
133         if (sna != null) {
134             sna.updateMeterFeatures(notification);
135         }
136     }
137
138     @Override
139     public void onGroupFeaturesUpdated(GroupFeaturesUpdated notification) {
140         final NodeStatisticsAger sna = this.statisticsManager.getStatisticsHandler(notification.getId());
141         if (sna != null) {
142             sna.updateGroupFeatures(notification);
143         }
144     }
145
146     @Override
147     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
148         //Check if response is for the request statistics-manager sent.
149         if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
150             return;
151
152         sucLogger.debug("Received flow stats update : {}",notification.toString());
153
154         final NodeKey key = new NodeKey(notification.getId());
155         final NodeStatisticsAger nsa =  this.statisticsManager.getStatisticsHandler(key.getId());
156         DataModificationTransaction it = this.statisticsManager.startChange();
157
158         for(FlowAndStatisticsMapList map: notification.getFlowAndStatisticsMapList()){
159             short tableId = map.getTableId();
160
161
162             boolean foundOriginalFlow = false;
163
164             FlowBuilder flowBuilder = new FlowBuilder();
165
166             FlowStatisticsDataBuilder flowStatisticsData = new FlowStatisticsDataBuilder();
167
168             FlowBuilder flow = new FlowBuilder();
169             flow.setContainerName(map.getContainerName());
170             flow.setBufferId(map.getBufferId());
171             flow.setCookie(map.getCookie());
172             flow.setCookieMask(map.getCookieMask());
173             flow.setFlags(map.getFlags());
174             flow.setFlowName(map.getFlowName());
175             flow.setHardTimeout(map.getHardTimeout());
176             if(map.getFlowId() != null)
177                 flow.setId(new FlowId(map.getFlowId().getValue()));
178             flow.setIdleTimeout(map.getIdleTimeout());
179             flow.setInstallHw(map.isInstallHw());
180             flow.setInstructions(map.getInstructions());
181             if(map.getFlowId()!= null)
182                 flow.setKey(new FlowKey(new FlowId(map.getKey().getFlowId().getValue())));
183             flow.setMatch(map.getMatch());
184             flow.setOutGroup(map.getOutGroup());
185             flow.setOutPort(map.getOutPort());
186             flow.setPriority(map.getPriority());
187             flow.setStrict(map.isStrict());
188             flow.setTableId(tableId);
189
190             Flow flowRule = flow.build();
191
192             FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder();
193             stats.setByteCount(map.getByteCount());
194             stats.setPacketCount(map.getPacketCount());
195             stats.setDuration(map.getDuration());
196
197             GenericStatistics flowStats = stats.build();
198
199             //Augment the data to the flow node
200
201             FlowStatisticsBuilder flowStatistics = new FlowStatisticsBuilder();
202             flowStatistics.setByteCount(flowStats.getByteCount());
203             flowStatistics.setPacketCount(flowStats.getPacketCount());
204             flowStatistics.setDuration(flowStats.getDuration());
205             flowStatistics.setContainerName(map.getContainerName());
206             flowStatistics.setBufferId(map.getBufferId());
207             flowStatistics.setCookie(map.getCookie());
208             flowStatistics.setCookieMask(map.getCookieMask());
209             flowStatistics.setFlags(map.getFlags());
210             flowStatistics.setFlowName(map.getFlowName());
211             flowStatistics.setHardTimeout(map.getHardTimeout());
212             flowStatistics.setIdleTimeout(map.getIdleTimeout());
213             flowStatistics.setInstallHw(map.isInstallHw());
214             flowStatistics.setInstructions(map.getInstructions());
215             flowStatistics.setMatch(map.getMatch());
216             flowStatistics.setOutGroup(map.getOutGroup());
217             flowStatistics.setOutPort(map.getOutPort());
218             flowStatistics.setPriority(map.getPriority());
219             flowStatistics.setStrict(map.isStrict());
220             flowStatistics.setTableId(tableId);
221
222             flowStatisticsData.setFlowStatistics(flowStatistics.build());
223
224             sucLogger.debug("Flow : {}",flowRule.toString());
225             sucLogger.debug("Statistics to augment : {}",flowStatistics.build().toString());
226
227             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
228                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
229
230             Table table= (Table)it.readConfigurationData(tableRef);
231
232             //TODO: Not a good way to do it, need to figure out better way.
233             //TODO: major issue in any alternate approach is that flow key is incrementally assigned
234             //to the flows stored in data store.
235             // Augment same statistics to all the matching masked flow
236             if(table != null){
237
238                 for(Flow existingFlow : table.getFlow()){
239                     sucLogger.debug("Existing flow in data store : {}",existingFlow.toString());
240                     if(FlowComparator.flowEquals(flowRule,existingFlow)){
241                         InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
242                                 .augmentation(FlowCapableNode.class)
243                                 .child(Table.class, new TableKey(tableId))
244                                 .child(Flow.class,existingFlow.getKey()).toInstance();
245                         flowBuilder.setKey(existingFlow.getKey());
246                         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
247                         sucLogger.debug("Found matching flow in the datastore, augmenting statistics");
248                         foundOriginalFlow = true;
249                         // Update entry with timestamp of latest response
250                         flow.setKey(existingFlow.getKey());
251                         FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
252                         nsa.updateFlowStats(flowStatsEntry);
253
254                         it.putOperationalData(flowRef, flowBuilder.build());
255                     }
256                 }
257             }
258
259             table= (Table)it.readOperationalData(tableRef);
260             if(!foundOriginalFlow && table != null){
261
262                 for(Flow existingFlow : table.getFlow()){
263                     FlowStatisticsData augmentedflowStatisticsData = existingFlow.getAugmentation(FlowStatisticsData.class);
264                     if(augmentedflowStatisticsData != null){
265                         FlowBuilder existingOperationalFlow = new FlowBuilder();
266                         existingOperationalFlow.fieldsFrom(augmentedflowStatisticsData.getFlowStatistics());
267                         sucLogger.debug("Existing unaccounted flow in operational data store : {}",existingFlow.toString());
268                         if(FlowComparator.flowEquals(flowRule,existingOperationalFlow.build())){
269                             InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
270                                     .augmentation(FlowCapableNode.class)
271                                     .child(Table.class, new TableKey(tableId))
272                                     .child(Flow.class,existingFlow.getKey()).toInstance();
273                             flowBuilder.setKey(existingFlow.getKey());
274                             flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
275                             sucLogger.debug("Found matching unaccounted flow in the operational datastore, augmenting statistics");
276                             foundOriginalFlow = true;
277
278                             // Update entry with timestamp of latest response
279                             flow.setKey(existingFlow.getKey());
280                             FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
281                             nsa.updateFlowStats(flowStatsEntry);
282
283                             it.putOperationalData(flowRef, flowBuilder.build());
284                             break;
285                         }
286                     }
287                 }
288             }
289             if(!foundOriginalFlow){
290                 String flowKey = "#UF$TABLE*"+Short.toString(tableId)+"*"+Integer.toString(this.unaccountedFlowsCounter);
291                 this.unaccountedFlowsCounter++;
292                 FlowKey newFlowKey = new FlowKey(new FlowId(flowKey));
293                 InstanceIdentifier<Flow> flowRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, key)
294                         .augmentation(FlowCapableNode.class)
295                         .child(Table.class, new TableKey(tableId))
296                         .child(Flow.class,newFlowKey).toInstance();
297                 flowBuilder.setKey(newFlowKey);
298                 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
299                 sucLogger.debug("Flow {} is not present in config data store, augmenting statistics as an unaccounted flow",flowBuilder.build());
300
301                 // Update entry with timestamp of latest response
302                 flow.setKey(newFlowKey);
303                 FlowEntry flowStatsEntry = nsa.new FlowEntry(tableId,flow.build());
304                 nsa.updateFlowStats(flowStatsEntry);
305
306                 it.putOperationalData(flowRef, flowBuilder.build());
307             }
308         }
309         it.commit();
310     }
311
312     @Override
313     public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
314         //Check if response is for the request statistics-manager sent.
315         if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
316             return;
317
318         final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
319         if (nsa != null) {
320             final Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId());
321             nsa.updateAggregateFlowStats(tableId, notification);
322         }
323     }
324
325     @Override
326     public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
327         //Check if response is for the request statistics-manager sent.
328         if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
329             return;
330
331         final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
332         if (nsa != null) {
333             nsa.updateNodeConnectorStats(notification.getNodeConnectorStatisticsAndPortNumberMap());
334         }
335     }
336
337     @Override
338     public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
339         //Check if response is for the request statistics-manager sent.
340         if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
341             return;
342
343         final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
344         if (nsa != null) {
345             nsa.updateFlowTableStats(notification.getFlowTableAndStatisticsMap());
346         }
347     }
348
349     @Override
350     public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
351         //Check if response is for the request statistics-manager sent.
352         if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
353             return;
354
355         //Add statistics to local cache
356         final NodeStatisticsAger nsa = this.statisticsManager.getStatisticsHandler(notification.getId());
357         if (nsa != null) {
358             nsa.updateQueueStats(notification.getQueueIdAndStatisticsMap());
359         }
360     }
361 }
362