BUG 2049 DataStore failure in StatisticsManager
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatListenCommitFlow.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.statistics.manager.impl;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
25 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
26 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
27 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
28 import org.opendaylight.controller.md.statistics.manager.impl.helper.FlowComparator;
29 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowHashIdMapping;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowHashIdMappingBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMap;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapKey;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
59 import org.opendaylight.yangtools.yang.binding.DataObject;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 import com.google.common.base.Optional;
66 import com.google.common.collect.BiMap;
67 import com.google.common.collect.HashBiMap;
68
69 /**
70  * statistics-manager
71  * org.opendaylight.controller.md.statistics.manager.impl
72  *
73  * StatListenCommitFlow
74  * Class is a NotifyListener for FlowStatistics and DataChangeListener for Config/DataStore for Flow node.
75  * All expected (registered) FlowStatistics will be builded and commit to Operational/DataStore.
76  * DataChangeEven should call create/delete Flow in Operational/DS create process needs to pair
77  * Device Flow HashCode and FlowId from Config/DS
78  *
79  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
80  *
81  */
82 public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, OpendaylightFlowStatisticsListener>
83                                             implements OpendaylightFlowStatisticsListener {
84
85     protected static final Logger LOG = LoggerFactory.getLogger(StatListenCommitFlow.class);
86
87     private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
88
89     private static final Integer REMOVE_AFTER_MISSING_COLLECTION = 1;
90
91     private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
92
93     public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
94             final NotificationProviderService nps){
95         super(manager, db, nps, Flow.class);
96     }
97
98     @Override
99     protected OpendaylightFlowStatisticsListener getStatNotificationListener() {
100         return this;
101     }
102
103     @Override
104     protected InstanceIdentifier<Flow> getWildCardedRegistrationPath() {
105         return InstanceIdentifier.create(Nodes.class).child(Node.class)
106                 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class);
107     }
108
109     @Override
110     public void onAggregateFlowStatisticsUpdate(final AggregateFlowStatisticsUpdate notification) {
111         final TransactionId transId = notification.getTransactionId();
112         final NodeId nodeId = notification.getId();
113         if ( ! isExpectedStatistics(transId, nodeId)) {
114             LOG.debug("STAT-MANAGER - AggregateFlowStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
115             return;
116         }
117         manager.getRpcMsgManager().addNotification(notification, nodeId);
118         if (notification.isMoreReplies()) {
119             return;
120         }
121         /* check flow Capable Node and write statistics */
122         manager.enqueue(new StatDataStoreOperation() {
123             @Override
124             public void applyOperation(final ReadWriteTransaction tx) {
125
126                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
127                 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
128                     return;
129                 }
130                 final Optional<? extends DataObject> inputObj = txContainer.get().getConfInput();
131                 if (( ! inputObj.isPresent()) || ( ! (inputObj.get() instanceof Table))) {
132                     return;
133                 }
134                 final Table table = (Table) inputObj.get();
135                 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
136                 for (final TransactionAware notif : cacheNotifs) {
137                     if (notif instanceof AggregateFlowStatisticsUpdate) {
138                         final AggregateFlowStatisticsData stats = new AggregateFlowStatisticsDataBuilder()
139                             .setAggregateFlowStatistics(new AggregateFlowStatisticsBuilder(notification).build()).build();
140                         final InstanceIdentifier<FlowCapableNode> fNodeIdent = InstanceIdentifier.create(Nodes.class)
141                                 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
142                         final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = fNodeIdent
143                                 .child(Table.class, table.getKey()).augmentation(AggregateFlowStatisticsData.class);
144                         Optional<FlowCapableNode> fNode = Optional.absent();
145                         try {
146                             fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
147                         } catch (final ReadFailedException e) {
148                             LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
149                             return;
150                         }
151                         if (fNode.isPresent()) {
152                             tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats, true);
153                         }
154                     }
155                 }
156             }
157         });
158     }
159
160     @Override
161     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
162         final TransactionId transId = notification.getTransactionId();
163         final NodeId nodeId = notification.getId();
164         if ( ! isExpectedStatistics(transId, nodeId)) {
165             LOG.debug("STAT-MANAGER - FlowsStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
166             return;
167         }
168         manager.getRpcMsgManager().addNotification(notification, nodeId);
169         if (notification.isMoreReplies()) {
170             LOG.trace("Next notification for join txId {}", transId);
171             return;
172         }
173         /* add flow's statistics */
174         manager.enqueue(new StatDataStoreOperation() {
175             @Override
176             public void applyOperation(final ReadWriteTransaction tx) {
177                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
178                 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
179                     return;
180                 }
181                 final List<FlowAndStatisticsMapList> flowStats = new ArrayList<FlowAndStatisticsMapList>(10);
182                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
183                         .child(Node.class, new NodeKey(nodeId));
184                 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
185                 for (final TransactionAware notif : cacheNotifs) {
186                     if (notif instanceof FlowsStatisticsUpdate) {
187                         final List<FlowAndStatisticsMapList> notifList =
188                                 ((FlowsStatisticsUpdate) notif).getFlowAndStatisticsMapList();
189                         if (notifList != null) {
190                             flowStats.addAll(notifList);
191                         }
192                     }
193                 }
194
195                 statsFlowCommitAll(flowStats, nodeIdent, tx);
196                 /* cleaning all not cached hash collisions */
197                 final Map<InstanceIdentifier<Flow>, Integer> listAliens = mapNodesForDelete.get(nodeIdent);
198                 if (listAliens != null) {
199                     for (final Entry<InstanceIdentifier<Flow>, Integer> nodeForDelete : listAliens.entrySet()) {
200                         final Integer lifeIndex = nodeForDelete.getValue();
201                         if (nodeForDelete.getValue() > 0) {
202                             nodeForDelete.setValue(Integer.valueOf(lifeIndex.intValue() - 1));
203                         } else {
204                             final InstanceIdentifier<Flow> flowNodeIdent = nodeForDelete.getKey();
205                             mapNodesForDelete.get(nodeIdent).remove(flowNodeIdent);
206                             tx.delete(LogicalDatastoreType.OPERATIONAL, flowNodeIdent);
207                         }
208                     }
209                 }
210                 /* Notification for continue collecting statistics */
211                 notifyToCollectNextStatistics(nodeIdent);
212             }
213         });
214     }
215
216     private void statsFlowCommitAll(final List<FlowAndStatisticsMapList> list,
217             final InstanceIdentifier<Node> nodeIdent, final ReadWriteTransaction tx) {
218
219         final InstanceIdentifier<FlowCapableNode> fNodeIdent = nodeIdent.augmentation(FlowCapableNode.class);
220
221         final Optional<FlowCapableNode> fNode;
222         try {
223             fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
224         }
225         catch (final ReadFailedException e) {
226             LOG.debug("Read FlowCapableNode {} in Operational/DS fail! Statistic scan not be updated.", nodeIdent, e);
227             return;
228         }
229         if ( ! fNode.isPresent()) {
230             LOG.trace("FlowCapableNode {} is not presented in Operational/DS. Statisticscan not be updated.", nodeIdent);
231             return;
232         }
233
234         final NodeUpdateState nodeState = new NodeUpdateState(fNodeIdent,fNode.get());
235
236         for (final FlowAndStatisticsMapList flowStat : list) {
237             final TableKey tableKey = new TableKey(flowStat.getTableId());
238             final TableFlowUpdateState tableState = nodeState.getTable(tableKey, tx);
239             tableState.reportFlow(flowStat,tx);
240         }
241
242         for (final TableFlowUpdateState table : nodeState.getTables()) {
243             table.removeUnreportedFlows(tx);
244         }
245     }
246
247     /**
248      * Method adds statistics to Flow
249      *
250      * @param flowBuilder
251      * @param deviceFlow
252      */
253     private void addStatistics(final FlowBuilder flowBuilder, final FlowAndStatisticsMapList deviceFlow) {
254         final FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder(deviceFlow);
255         final FlowStatisticsBuilder flowStatisticsBuilder = new FlowStatisticsBuilder(stats.build());
256         final FlowStatisticsDataBuilder flowStatisticsData =new FlowStatisticsDataBuilder();
257         flowStatisticsData.setFlowStatistics(flowStatisticsBuilder.build());
258         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
259     }
260
261     /**
262      * build pseudoUnique hashCode for flow in table
263      * for future easy identification
264      *
265      * FIXME: we expect same version for YANG models for all clusters and that has to be fix
266      * FIXME: CREATE BETTER KEY - for flow (MATCH is the problem)
267      */
268     static String buildFlowIdOperKey(final FlowAndStatisticsMapList deviceFlow) {
269         return new StringBuffer().append(deviceFlow.getMatch())
270                 .append(deviceFlow.getPriority()).append(deviceFlow.getCookie().getValue()).toString();
271     }
272
273     private class NodeUpdateState {
274         private final InstanceIdentifier<FlowCapableNode> nodeIdentifier;
275         private final Map<TableKey,TableFlowUpdateState> tables = new HashMap<>();
276
277         public NodeUpdateState(final InstanceIdentifier<FlowCapableNode> fNodeIdent, final FlowCapableNode flowCapableNode) {
278             nodeIdentifier = fNodeIdent;
279             final List<Table> tableList = flowCapableNode.getTable();
280             if(tableList != null) {
281             for (final Table table : tableList) {
282                 final TableKey tableKey = table.getKey();
283                     tables.put(tableKey, new TableFlowUpdateState(nodeIdentifier.child(Table.class,tableKey),table));
284                 }
285             }
286         }
287
288         public Iterable<TableFlowUpdateState> getTables() {
289             return tables.values();
290         }
291
292         TableFlowUpdateState getTable(final TableKey key,final ReadWriteTransaction tx) {
293             TableFlowUpdateState table = tables.get(key);
294             if(table == null) {
295                 table = new TableFlowUpdateState(nodeIdentifier.child(Table.class, key), null);
296                 tables.put(key, table);
297             }
298             return table;
299         }
300     }
301
302     private class TableFlowUpdateState {
303
304         private boolean tableEnsured = false;
305         final KeyedInstanceIdentifier<Table, TableKey> tableRef;
306         final TableKey tableKey;
307         final BiMap<FlowHashIdMapKey, FlowId> flowIdByHash;
308         List<Flow> configFlows;
309
310         public TableFlowUpdateState(final KeyedInstanceIdentifier<Table, TableKey> tablePath, final Table table) {
311             tableRef = tablePath;
312             tableKey = tablePath.getKey();
313             flowIdByHash = HashBiMap.create();
314             if(table != null) {
315                 final FlowHashIdMapping flowHashMapping = table.getAugmentation(FlowHashIdMapping.class);
316                 if (flowHashMapping != null) {
317                     final List<FlowHashIdMap>  flowHashMap = flowHashMapping.getFlowHashIdMap() != null
318                             ? flowHashMapping.getFlowHashIdMap() : Collections.<FlowHashIdMap> emptyList();
319                     for (final FlowHashIdMap flowHashId : flowHashMap) {
320                         try {
321                             flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
322                         } catch (final Exception e) {
323                             LOG.warn("flow hashing hit a duplicate for {} -> {}", flowHashId.getKey(), flowHashId.getFlowId());
324                         }
325                     }
326                 }
327             }
328         }
329
330         private void ensureTable(final ReadWriteTransaction tx) {
331             if( ! tableEnsured) {
332                 final FlowHashIdMapping emptyMapping = new FlowHashIdMappingBuilder()
333                     .setFlowHashIdMap(Collections.<FlowHashIdMap> emptyList()).build();
334                 tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping, true);
335                 tableEnsured = true;
336             }
337         }
338
339         private FlowKey searchInConfiguration(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
340             initConfigFlows(trans);
341             final Iterator<Flow> it = configFlows.iterator();
342             while(it.hasNext()) {
343                 final Flow cfgFlow = it.next();
344                 final FlowKey cfgKey = cfgFlow.getKey();
345                 if(flowIdByHash.inverse().containsKey(cfgKey)) {
346                     it.remove();
347                 } else if(FlowComparator.flowEquals(flowStat, cfgFlow)) {
348                     it.remove();
349                     return cfgKey;
350                 }
351             }
352             return null;
353         }
354
355         private void initConfigFlows(final ReadWriteTransaction trans) {
356             final Optional<Table> table = readLatestConfiguration(tableRef);
357             List<Flow> localList = null;
358             if(table.isPresent()) {
359                 localList = table.get().getFlow();
360             }
361             if(localList == null) {
362                 configFlows = Collections.emptyList();
363             } else {
364                 configFlows = new LinkedList<>(localList);
365             }
366         }
367
368         private FlowKey getFlowKeyAndRemoveHash(final FlowHashIdMapKey key) {
369             final FlowId ret = flowIdByHash.get(key);
370             if(ret != null) {
371                 flowIdByHash.remove(key);
372                 return new FlowKey(ret);
373             }
374             return null;
375         }
376
377         /* Returns FlowKey which doesn't exist in any DataStore for now */
378         private FlowKey makeAlienFlowKey() {
379             final StringBuilder sBuilder = new StringBuilder(ALIEN_SYSTEM_FLOW_ID)
380                 .append(tableKey.getId()).append("-").append(unaccountedFlowsCounter.incrementAndGet());
381             final FlowId flowId = new FlowId(sBuilder.toString());
382             return new FlowKey(flowId);
383         }
384
385         private Map<FlowHashIdMapKey, FlowId> getRemovalList() {
386             return flowIdByHash;
387         }
388
389         void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
390             ensureTable(trans);
391             final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
392             FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
393             if (flowKey == null) {
394                 flowKey = searchInConfiguration(flowStat, trans);
395                 if ( flowKey == null) {
396                     flowKey = makeAlienFlowKey();
397                 }
398                 updateHashCache(trans,flowKey,hashingKey);
399             }
400             final FlowBuilder flowBuilder = new FlowBuilder(flowStat);
401             flowBuilder.setKey(flowKey);
402             addStatistics(flowBuilder, flowStat);
403             final InstanceIdentifier<Flow> flowIdent = tableRef.child(Flow.class, flowKey);
404             trans.put(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
405             /* check life for Alien flows */
406             if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
407                 removeData(flowIdent, REMOVE_AFTER_MISSING_COLLECTION);
408             }
409         }
410
411         /* Build and deploy new FlowHashId map */
412         private void updateHashCache(final ReadWriteTransaction trans, final FlowKey flowKey, final FlowHashIdMapKey hashingKey) {
413             final FlowHashIdMapBuilder flHashIdMap = new FlowHashIdMapBuilder();
414             flHashIdMap.setFlowId(flowKey.getId());
415             flHashIdMap.setKey(hashingKey);
416             final KeyedInstanceIdentifier<FlowHashIdMap, FlowHashIdMapKey> flHashIdent = tableRef
417                     .augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, hashingKey);
418             /* Add new FlowHashIdMap */
419             trans.put(LogicalDatastoreType.OPERATIONAL, flHashIdent, flHashIdMap.build());
420         }
421
422         void removeUnreportedFlows(final ReadWriteTransaction tx) {
423             final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
424             final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
425             final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
426             for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
427                 final FlowKey flowKey = new FlowKey(entryForRemove.getValue());
428                 final InstanceIdentifier<Flow> flowRef = tableRef.child(Flow.class, flowKey);
429                 if (nodeDeleteMap != null && flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
430                     final Integer lifeIndex = nodeDeleteMap.get(flowRef);
431                     if (lifeIndex > 0) {
432                         break;
433                     } else {
434                         nodeDeleteMap.remove(flowRef);
435                     }
436                 }
437                 final InstanceIdentifier<FlowHashIdMap> flHashIdent =
438                         tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
439                 tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
440                 tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
441             }
442         }
443     }
444 }
445