Merge "Bug 6110: Fixed bugs in statistics manager due to race condition." into stable...
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatListenCommitFlow.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.collect.BiMap;
13 import com.google.common.collect.HashBiMap;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
16 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
17 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
19 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
20 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
21 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
22 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
23 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
24 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
25 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
26 import org.opendaylight.openflowplugin.applications.statistics.manager.impl.helper.FlowComparator;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowHashIdMapping;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowHashIdMappingBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMap;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapKey;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
58 import org.opendaylight.yangtools.yang.binding.DataObject;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import java.math.BigInteger;
65 import java.util.ArrayList;
66 import java.util.Collection;
67 import java.util.Collections;
68 import java.util.HashMap;
69 import java.util.Iterator;
70 import java.util.List;
71 import java.util.Map;
72 import java.util.Map.Entry;
73 import java.util.UUID;
74 import java.util.concurrent.atomic.AtomicInteger;
75
76 /**
77  * statistics-manager
78  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
79  *
80  * StatListenCommitFlow
81  * Class is a NotifyListener for FlowStatistics and DataTreeChangeListener for Config/DataStore for Flow node.
82  * All expected (registered) FlowStatistics will be builded and commit to Operational/DataStore.
83  * DataTreeModification should call create/delete Flow in Operational/DS create process needs to pair
84  * Device Flow HashCode and FlowId from Config/DS
85  */
86 public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, OpendaylightFlowStatisticsListener>
87                                             implements OpendaylightFlowStatisticsListener {
88
89     protected static final Logger LOG = LoggerFactory.getLogger(StatListenCommitFlow.class);
90
91     private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
92
93     private static final Integer REMOVE_AFTER_MISSING_COLLECTION = 1;
94     private static final int TRUNCATED_LOG_MESSAGE_LENGTH = 30;
95
96     private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
97
98     public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
99             final NotificationProviderService nps,
100                                  final StatNodeRegistration nrm){
101         super(manager, db, nps, Flow.class,nrm);
102     }
103
104     @Override
105     protected OpendaylightFlowStatisticsListener getStatNotificationListener() {
106         return this;
107     }
108
109     @Override
110     protected InstanceIdentifier<Flow> getWildCardedRegistrationPath() {
111         return InstanceIdentifier.create(Nodes.class).child(Node.class)
112                 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class);
113     }
114
115     @Override
116     public void onAggregateFlowStatisticsUpdate(final AggregateFlowStatisticsUpdate notification) {
117         final TransactionId transId = notification.getTransactionId();
118         final NodeId nodeId = notification.getId();
119         if ( ! isExpectedStatistics(transId, nodeId)) {
120             LOG.debug("STAT-MANAGER - AggregateFlowStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
121             return;
122         }
123         manager.getRpcMsgManager().addNotification(notification, nodeId);
124         if (notification.isMoreReplies()) {
125             return;
126         }
127         /* check flow Capable Node and write statistics */
128         manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
129             @Override
130             public void applyOperation(final ReadWriteTransaction tx) {
131
132                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
133                 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
134                     return;
135                 }
136                 final Optional<? extends DataObject> inputObj = txContainer.get().getConfInput();
137                 if (( ! inputObj.isPresent()) || ( ! (inputObj.get() instanceof Table))) {
138                     return;
139                 }
140
141                 if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
142
143                 final Table table = (Table) inputObj.get();
144                 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
145                 for (final TransactionAware notif : cacheNotifs) {
146                     if (notif instanceof AggregateFlowStatisticsUpdate) {
147                         final AggregateFlowStatisticsData stats = new AggregateFlowStatisticsDataBuilder()
148                             .setAggregateFlowStatistics(new AggregateFlowStatisticsBuilder(notification).build()).build();
149                         final InstanceIdentifier<FlowCapableNode> fNodeIdent = InstanceIdentifier.create(Nodes.class)
150                                 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
151                         final InstanceIdentifier<Table> tableRef = fNodeIdent.child(Table.class, table.getKey());
152                         final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = tableRef
153                                 .augmentation(AggregateFlowStatisticsData.class);
154                         Optional<FlowCapableNode> fNode = Optional.absent();
155                         try {
156                             fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
157                         } catch (final ReadFailedException e) {
158                             LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
159                             return;
160                         }
161                         if (fNode.isPresent()) {
162                             ensureTable(tx, table.getId(), tableRef);
163                             tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats);
164                         }
165                     }
166                 }
167             }
168
169             @Override
170             public UUID generatedUUIDForNode() {
171                 return manager.getGeneratedUUIDForNode(getNodeIdentifier());
172             }
173         });
174     }
175
176     public void ensureTable(final ReadWriteTransaction tx, final Short tableId, final InstanceIdentifier<Table> tableRef) {
177         final Table tableNew = new TableBuilder().setId(tableId).build();
178         tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef, tableNew);
179     }
180
181     protected void processDataChange(Collection<DataTreeModification<Flow>> changes) {
182         if (!changes.isEmpty()) {
183             for (DataTreeModification<Flow> dataChange : changes) {
184                 if (dataChange.getRootNode().getModificationType() == DataObjectModification.ModificationType.DELETE) {
185                     final InstanceIdentifier<Node> nodeIdent = dataChange.getRootPath().getRootIdentifier()
186                             .firstIdentifierOf(Node.class);
187                     if (!removedDataBetweenStatsCycle.containsKey(nodeIdent)) {
188                         removedDataBetweenStatsCycle.put(nodeIdent, new ArrayList<>());
189                     }
190                     Flow data = dataChange.getRootNode().getDataBefore();
191                     removedDataBetweenStatsCycle.get(nodeIdent).add(data);
192                     LOG.debug("Node: {} :: Flow removed {}",nodeIdent.firstKeyOf(Node.class).getId(), data.toString());
193                 }
194             }
195         }
196     }
197
198     @Override
199     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
200         final TransactionId transId = notification.getTransactionId();
201         final NodeId nodeId = notification.getId();
202         if ( ! isExpectedStatistics(transId, nodeId)) {
203             LOG.debug("STAT-MANAGER - FlowsStatisticsUpdate: unregistered notification detect TransactionId {}",
204                     transId);
205             return;
206         }
207         manager.getRpcMsgManager().addNotification(notification, nodeId);
208         if (notification.isMoreReplies()) {
209             LOG.trace("Next notification for join txId {}", transId);
210             return;
211         }
212         /* add flow's statistics */
213         manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
214             @Override
215             public void applyOperation(final ReadWriteTransaction tx) {
216                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
217                 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
218                     return;
219                 }
220                 if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
221
222                 final List<FlowAndStatisticsMapList> flowStats = new ArrayList<FlowAndStatisticsMapList>(10);
223                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
224                         .child(Node.class, new NodeKey(nodeId));
225                 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
226                 for (final TransactionAware notif : cacheNotifs) {
227                     if (notif instanceof FlowsStatisticsUpdate) {
228                         final List<FlowAndStatisticsMapList> notifList =
229                                 ((FlowsStatisticsUpdate) notif).getFlowAndStatisticsMapList();
230                         if (notifList != null) {
231                             flowStats.addAll(notifList);
232                         }
233                     }
234                 }
235
236                 statsFlowCommitAll(flowStats, nodeIdent, tx);
237                 /* cleaning all not cached hash collisions */
238                 final Map<InstanceIdentifier<Flow>, Integer> listAliens = mapNodesForDelete.get(nodeIdent);
239                 if (listAliens != null) {
240                     for (final Entry<InstanceIdentifier<Flow>, Integer> nodeForDelete : listAliens.entrySet()) {
241                         final Integer lifeIndex = nodeForDelete.getValue();
242                         if (nodeForDelete.getValue() > 0) {
243                             nodeForDelete.setValue(Integer.valueOf(lifeIndex.intValue() - 1));
244                         } else {
245                             final InstanceIdentifier<Flow> flowNodeIdent = nodeForDelete.getKey();
246                             mapNodesForDelete.get(nodeIdent).remove(flowNodeIdent);
247                             tx.delete(LogicalDatastoreType.OPERATIONAL, flowNodeIdent);
248                         }
249                     }
250                 }
251                 /* Notification for continue collecting statistics */
252                 notifyToCollectNextStatistics(nodeIdent, transId);
253             }
254
255             @Override
256             public UUID generatedUUIDForNode() {
257                 return manager.getGeneratedUUIDForNode(getNodeIdentifier());
258             }
259
260         });
261     }
262
263     private void statsFlowCommitAll(final List<FlowAndStatisticsMapList> list,
264             final InstanceIdentifier<Node> nodeIdent, final ReadWriteTransaction tx) {
265
266         final InstanceIdentifier<FlowCapableNode> fNodeIdent = nodeIdent.augmentation(FlowCapableNode.class);
267
268         //cleanup the hashmap ID for the flows deleted between two stats cycle, also cleanup the
269         // data change cache as well.
270         ArrayList<Flow> deletedFlows = removedDataBetweenStatsCycle.remove(nodeIdent);
271
272         if (deletedFlows != null && !deletedFlows.isEmpty()) {
273             LOG.trace("Number of flows deleted from node {} between two stats cycles are {}", nodeIdent, deletedFlows
274                     .size());
275         }
276
277         final Optional<FlowCapableNode> fNode;
278         try {
279             fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
280         }
281         catch (final ReadFailedException e) {
282             LOG.debug("Read FlowCapableNode {} in Operational/DS fail! Statistic scan not be updated.", nodeIdent, e);
283             return;
284         }
285         if ( ! fNode.isPresent()) {
286             LOG.trace("FlowCapableNode {} is not presented in Operational/DS. Statisticscan not be updated.", nodeIdent);
287             return;
288         }
289
290         final NodeUpdateState nodeState = new NodeUpdateState(fNodeIdent,fNode.get());
291
292         for (final FlowAndStatisticsMapList flowStat : list) {
293             final TableKey tableKey = new TableKey(flowStat.getTableId());
294             final TableFlowUpdateState tableState = nodeState.getTable(tableKey, tx);
295             Flow removedConfigFlow = getFlowIfRemoved(flowStat, deletedFlows);
296             if (removedConfigFlow == null) {
297                 tableState.reportFlow(flowStat,tx, false);
298             } else {
299                 deletedFlows.remove(removedConfigFlow);
300                 tableState.reportFlow(flowStat,tx, true);
301             }
302         }
303
304         if (deletedFlows != null ) {
305             deletedFlows.clear();
306         }
307         for (final TableFlowUpdateState table : nodeState.getTables()) {
308             table.removeUnreportedFlows(tx);
309         }
310     }
311
312     private Flow getFlowIfRemoved(FlowAndStatisticsMapList flowStat, ArrayList<Flow> deletedFlows) {
313         if (deletedFlows != null && !deletedFlows.isEmpty()) {
314             for (Flow flow : deletedFlows) {
315                 final FlowAndStatisticsMapList configFlowStats = new FlowAndStatisticsMapListBuilder(flow).build();
316                 if ( flowStat.getMatch().equals(configFlowStats.getMatch()) &&
317                         flowStat.getPriority().equals(configFlowStats.getPriority()) &&
318                         flowStat.getCookie().equals(configFlowStats.getCookie()!=null?configFlowStats.getCookie():
319                                 new FlowCookie(new BigInteger("0")))) {
320                     LOG.debug("Flow statistics {} are related to flow {}, but it's REMOVED from the config data store" +
321                             "store", flowStat, flow);
322                     return flow;
323                 }
324             }
325         }
326         return null;
327     }
328     /**
329      * Method adds statistics to Flow
330      *
331      * @param flowBuilder
332      * @param deviceFlow
333      */
334     private void addStatistics(final FlowBuilder flowBuilder, final FlowAndStatisticsMapList deviceFlow) {
335         final FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder(deviceFlow);
336         final FlowStatisticsBuilder flowStatisticsBuilder = new FlowStatisticsBuilder(stats.build());
337         final FlowStatisticsDataBuilder flowStatisticsData =new FlowStatisticsDataBuilder();
338         flowStatisticsData.setFlowStatistics(flowStatisticsBuilder.build());
339         flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
340     }
341
342     /**
343      * build pseudoUnique hashCode for flow in table
344      * for future easy identification
345      *
346      * FIXME: we expect same version for YANG models for all clusters and that has to be fix
347      * FIXME: CREATE BETTER KEY - for flow (MATCH is the problem)
348      */
349     static String buildFlowIdOperKey(final FlowAndStatisticsMapList deviceFlow) {
350         return new StringBuilder().append(deviceFlow.getMatch())
351                 .append(deviceFlow.getPriority()).append(deviceFlow.getCookie().getValue()).toString();
352     }
353
354     private class NodeUpdateState {
355         private final InstanceIdentifier<FlowCapableNode> nodeIdentifier;
356         private final Map<TableKey,TableFlowUpdateState> tables = new HashMap<>();
357
358         public NodeUpdateState(final InstanceIdentifier<FlowCapableNode> fNodeIdent, final FlowCapableNode flowCapableNode) {
359             nodeIdentifier = fNodeIdent;
360             final List<Table> tableList = flowCapableNode.getTable();
361             if(tableList != null) {
362             for (final Table table : tableList) {
363                 final TableKey tableKey = table.getKey();
364                     tables.put(tableKey, new TableFlowUpdateState(nodeIdentifier.child(Table.class,tableKey),table));
365                 }
366             }
367         }
368
369         public Iterable<TableFlowUpdateState> getTables() {
370             return tables.values();
371         }
372
373         TableFlowUpdateState getTable(final TableKey key,final ReadWriteTransaction tx) {
374             TableFlowUpdateState table = tables.get(key);
375             if(table == null) {
376                 table = new TableFlowUpdateState(nodeIdentifier.child(Table.class, key), null);
377                 tables.put(key, table);
378             }
379             return table;
380         }
381     }
382
383     private class TableFlowUpdateState {
384
385         private boolean tableEnsured = false;
386         final KeyedInstanceIdentifier<Table, TableKey> tableRef;
387         final TableKey tableKey;
388         final BiMap<FlowHashIdMapKey, FlowId> flowIdByHash;
389         List<Flow> configFlows;
390
391         public TableFlowUpdateState(final KeyedInstanceIdentifier<Table, TableKey> tablePath, final Table table) {
392             tableRef = tablePath;
393             tableKey = tablePath.getKey();
394             flowIdByHash = HashBiMap.create();
395             if(table != null) {
396                 final FlowHashIdMapping flowHashMapping = table.getAugmentation(FlowHashIdMapping.class);
397                 if (flowHashMapping != null) {
398                     final List<FlowHashIdMap>  flowHashMap = flowHashMapping.getFlowHashIdMap() != null
399                             ? flowHashMapping.getFlowHashIdMap() : Collections.<FlowHashIdMap> emptyList();
400                     for (final FlowHashIdMap flowHashId : flowHashMap) {
401                         try {
402                             flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
403                         } catch (final Exception e) {
404                             //flowHashId.getKey() too verbose for standard log.
405                             if(LOG.isDebugEnabled()) {
406                                 final FlowId currData = flowIdByHash.get(flowHashId.getKey());
407                                 LOG.debug("Flow hashing hit a duplicate for {} -> {}. Curr value: {} Equals:{}. " +
408                                         "Exception was raised:",
409                                     flowHashId.getKey(), flowHashId.getFlowId(), currData, flowHashId.getFlowId().equals(currData), e);
410                             }
411                             else
412                             {
413                                 LOG.warn("Flow hashing hit a duplicate {}. Exception was raised: {}. Enable DEBUG for" +
414                                         " more detail.",
415                                     flowHashId.getFlowId().toString().substring(0, Math.min(TRUNCATED_LOG_MESSAGE_LENGTH,flowHashId.getFlowId().toString().length())),
416                                     e.getMessage().substring(0,Math.min(TRUNCATED_LOG_MESSAGE_LENGTH,e.getMessage().length())));
417                             }
418                         }
419                     }
420                 }
421             }
422         }
423
424         private void ensureTableFowHashIdMapping(final ReadWriteTransaction tx) {
425             if( ! tableEnsured) {
426                 ensureTable(tx, tableKey.getId(), tableRef);
427                 final FlowHashIdMapping emptyMapping = new FlowHashIdMappingBuilder()
428                     .setFlowHashIdMap(Collections.<FlowHashIdMap> emptyList()).build();
429                 tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping);
430                 tableEnsured = true;
431             }
432         }
433
434         private FlowKey searchInConfiguration(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
435             initConfigFlows();
436             final Iterator<Flow> it = configFlows.iterator();
437             while(it.hasNext()) {
438                 final Flow cfgFlow = it.next();
439                 final FlowKey cfgKey = cfgFlow.getKey();
440                 final FlowId cfgFlowId = cfgKey.getId();
441
442                 if(! flowIdByHash.inverse().containsKey(cfgFlowId)) {
443                     if(FlowComparator.flowEquals(flowStat, cfgFlow)) {
444                         return cfgKey;
445                     }
446                 }
447             }
448             return null;
449         }
450
451         private void initConfigFlows() {
452             final Optional<Table> table = readLatestConfiguration(tableRef);
453             List<Flow> localList = null;
454             if(table.isPresent()) {
455                 localList = table.get().getFlow();
456             }
457             if(localList == null) {
458                 configFlows = Collections.emptyList();
459             } else {
460                 configFlows = new ArrayList<>(localList);
461             }
462         }
463
464         private FlowKey getFlowKeyByHash(final FlowHashIdMapKey key) {
465             final FlowId ret = flowIdByHash.get(key);
466             if(ret != null) {
467                 return new FlowKey(ret);
468             }
469             return null;
470         }
471
472         /* Returns FlowKey which doesn't exist in any DataStore for now */
473         private FlowKey makeAlienFlowKey() {
474             final StringBuilder sBuilder = new StringBuilder(ALIEN_SYSTEM_FLOW_ID)
475                 .append(tableKey.getId()).append("-").append(unaccountedFlowsCounter.incrementAndGet());
476             final FlowId flowId = new FlowId(sBuilder.toString());
477             return new FlowKey(flowId);
478         }
479
480         private Map<FlowHashIdMapKey, FlowId> getRemovalList() {
481             return flowIdByHash;
482         }
483
484         void reportFlow(final FlowAndStatisticsMapList flowStat,
485                         final ReadWriteTransaction trans,
486                         boolean wasRemoved) {
487             ensureTableFowHashIdMapping(trans);
488             final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
489             FlowKey flowKey = getFlowKeyByHash(hashingKey);
490             if (flowKey == null || wasRemoved) {
491                 flowKey = searchInConfiguration(flowStat, trans);
492                 if ( flowKey == null) {
493                     flowKey = makeAlienFlowKey();
494                 }
495                 updateHashCache(trans,flowKey,hashingKey);
496             } else {
497                 flowIdByHash.remove(hashingKey);
498             }
499             final FlowBuilder flowBuilder = new FlowBuilder(flowStat);
500             flowBuilder.setKey(flowKey);
501             addStatistics(flowBuilder, flowStat);
502             final InstanceIdentifier<Flow> flowIdent = tableRef.child(Flow.class, flowKey);
503             trans.put(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
504             /* check life for Alien flows */
505             if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
506                 removeData(flowIdent, REMOVE_AFTER_MISSING_COLLECTION);
507             }
508         }
509
510         /* Build and deploy new FlowHashId map */
511         private void updateHashCache(final ReadWriteTransaction trans, final FlowKey flowKey, final FlowHashIdMapKey hashingKey) {
512             final FlowHashIdMapBuilder flHashIdMap = new FlowHashIdMapBuilder();
513             flHashIdMap.setFlowId(flowKey.getId());
514             flHashIdMap.setKey(hashingKey);
515             final KeyedInstanceIdentifier<FlowHashIdMap, FlowHashIdMapKey> flHashIdent = tableRef
516                     .augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, hashingKey);
517             /* Add new FlowHashIdMap */
518             trans.put(LogicalDatastoreType.OPERATIONAL, flHashIdent, flHashIdMap.build());
519         }
520
521         void removeUnreportedFlows(final ReadWriteTransaction tx) {
522             final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
523             final List<InstanceIdentifier<Flow>> listMissingConfigFlows = notStatReportedConfigFlows();
524             final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
525             final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
526             for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
527                 final FlowKey flowKey = new FlowKey(entryForRemove.getValue());
528                 final InstanceIdentifier<Flow> flowRef = tableRef.child(Flow.class, flowKey);
529                 if (nodeDeleteMap != null && flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
530                     final Integer lifeIndex = nodeDeleteMap.get(flowRef);
531                      if (lifeIndex != null && lifeIndex > 0) {
532                         break;
533                     } else {
534                         nodeDeleteMap.remove(flowRef);
535                     }
536                 } else {
537                     if (listMissingConfigFlows.remove(flowRef)) {
538                         // it is probable that some multipart message was lost
539                         break;
540                     }
541                 }
542                 final InstanceIdentifier<FlowHashIdMap> flHashIdent =
543                         tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
544                 tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
545                 tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
546             }
547         }
548
549         List<InstanceIdentifier<Flow>> notStatReportedConfigFlows() {
550             if (configFlows != null) {
551                 final List<InstanceIdentifier<Flow>> returnList = new ArrayList<>(configFlows.size());
552                 for (final Flow confFlow : configFlows) {
553                     final InstanceIdentifier<Flow> confFlowIdent = tableRef.child(Flow.class, confFlow.getKey());
554                     returnList.add(confFlowIdent);
555                 }
556                 return returnList;
557             }
558             return Collections.emptyList();
559         }
560     }
561 }
562