Fix bug 2450 - Statistics collection slow - performance
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatListenCommitFlow.java
index c5aefcbf9673087a21de9f3f1677b4976d74b77d..e17c45dc767f5050c4113c4df96b2c6aaf170232 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.no
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
@@ -82,10 +83,12 @@ import com.google.common.collect.HashBiMap;
 public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, OpendaylightFlowStatisticsListener>
                                             implements OpendaylightFlowStatisticsListener {
 
-    private static final Logger LOG = LoggerFactory.getLogger(StatListenCommitFlow.class);
+    protected static final Logger LOG = LoggerFactory.getLogger(StatListenCommitFlow.class);
 
     private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
 
+    private static final Integer REMOVE_AFTER_MISSING_COLLECTION = 1;
+
     private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
 
     public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
@@ -137,18 +140,19 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                             .setAggregateFlowStatistics(new AggregateFlowStatisticsBuilder(notification).build()).build();
                         final InstanceIdentifier<FlowCapableNode> fNodeIdent = InstanceIdentifier.create(Nodes.class)
                                 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
-                        final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = fNodeIdent
-                                .child(Table.class, table.getKey()).augmentation(AggregateFlowStatisticsData.class);
+                        final InstanceIdentifier<Table> tableRef = fNodeIdent.child(Table.class, table.getKey());
+                        final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = tableRef
+                                .augmentation(AggregateFlowStatisticsData.class);
                         Optional<FlowCapableNode> fNode = Optional.absent();
                         try {
                             fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
-                        }
-                        catch (final ReadFailedException e) {
+                        } catch (final ReadFailedException e) {
                             LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
                             return;
                         }
                         if (fNode.isPresent()) {
-                            tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats, true);
+                            ensureTable(tx, table.getId(), tableRef);
+                            tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats);
                         }
                     }
                 }
@@ -156,6 +160,11 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         });
     }
 
+    public void ensureTable(final ReadWriteTransaction tx, final Short tableId, final InstanceIdentifier<Table> tableRef) {
+        final Table tableNew = new TableBuilder().setId(tableId).build();
+        tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef, tableNew);
+    }
+
     @Override
     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
         final TransactionId transId = notification.getTransactionId();
@@ -192,8 +201,22 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                 }
 
                 statsFlowCommitAll(flowStats, nodeIdent, tx);
+                /* cleaning all not cached hash collisions */
+                final Map<InstanceIdentifier<Flow>, Integer> listAliens = mapNodesForDelete.get(nodeIdent);
+                if (listAliens != null) {
+                    for (final Entry<InstanceIdentifier<Flow>, Integer> nodeForDelete : listAliens.entrySet()) {
+                        final Integer lifeIndex = nodeForDelete.getValue();
+                        if (nodeForDelete.getValue() > 0) {
+                            nodeForDelete.setValue(Integer.valueOf(lifeIndex.intValue() - 1));
+                        } else {
+                            final InstanceIdentifier<Flow> flowNodeIdent = nodeForDelete.getKey();
+                            mapNodesForDelete.get(nodeIdent).remove(flowNodeIdent);
+                            tx.delete(LogicalDatastoreType.OPERATIONAL, flowNodeIdent);
+                        }
+                    }
+                }
                 /* Notification for continue collecting statistics */
-                notifyToCollectNextStatistics(nodeIdent);
+                notifyToCollectNextStatistics(nodeIdent, transId);
             }
         });
     }
@@ -246,14 +269,13 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
     /**
      * build pseudoUnique hashCode for flow in table
      * for future easy identification
+     *
+     * FIXME: we expect same version for YANG models for all clusters and that has to be fix
+     * FIXME: CREATE BETTER KEY - for flow (MATCH is the problem)
      */
-    static String buildHashCode(final FlowAndStatisticsMapList deviceFlow) {
-        final FlowBuilder builder = new FlowBuilder();
-        builder.setMatch(deviceFlow.getMatch());
-        builder.setCookie(deviceFlow.getCookie());
-        builder.setPriority(deviceFlow.getPriority());
-        final Flow flowForHashCode = builder.build();
-        return String.valueOf(flowForHashCode.hashCode());
+    static String buildFlowIdOperKey(final FlowAndStatisticsMapList deviceFlow) {
+        return new StringBuffer().append(deviceFlow.getMatch())
+                .append(deviceFlow.getPriority()).append(deviceFlow.getCookie().getValue()).toString();
     }
 
     private class NodeUpdateState {
@@ -286,6 +308,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
     }
 
     private class TableFlowUpdateState {
+
         private boolean tableEnsured = false;
         final KeyedInstanceIdentifier<Table, TableKey> tableRef;
         final TableKey tableKey;
@@ -302,17 +325,22 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                     final List<FlowHashIdMap>  flowHashMap = flowHashMapping.getFlowHashIdMap() != null
                             ? flowHashMapping.getFlowHashIdMap() : Collections.<FlowHashIdMap> emptyList();
                     for (final FlowHashIdMap flowHashId : flowHashMap) {
-                        flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
+                        try {
+                            flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
+                        } catch (final Exception e) {
+                            LOG.warn("flow hashing hit a duplicate for {} -> {}", flowHashId.getKey(), flowHashId.getFlowId());
+                        }
                     }
                 }
             }
         }
 
-        private void ensureTable(final ReadWriteTransaction tx) {
+        private void ensureTableFowHashIdMapping(final ReadWriteTransaction tx) {
             if( ! tableEnsured) {
+                ensureTable(tx, tableKey.getId(), tableRef);
                 final FlowHashIdMapping emptyMapping = new FlowHashIdMappingBuilder()
                     .setFlowHashIdMap(Collections.<FlowHashIdMap> emptyList()).build();
-                tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping, true);
+                tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping);
                 tableEnsured = true;
             }
         }
@@ -334,12 +362,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         }
 
         private void initConfigFlows(final ReadWriteTransaction trans) {
-            Optional<Table> table = readLatestConfiguration(tableRef);
-            try {
-                table = trans.read(LogicalDatastoreType.CONFIGURATION, tableRef).checkedGet();
-            } catch (final ReadFailedException e) {
-                table = Optional.absent();
-            }
+            final Optional<Table> table = readLatestConfiguration(tableRef);
             List<Flow> localList = null;
             if(table.isPresent()) {
                 localList = table.get().getFlow();
@@ -373,8 +396,8 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         }
 
         void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
-            ensureTable(trans);
-            final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildHashCode(flowStat));
+            ensureTableFowHashIdMapping(trans);
+            final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
             FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
             if (flowKey == null) {
                 flowKey = searchInConfiguration(flowStat, trans);
@@ -390,13 +413,12 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             trans.put(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
             /* check life for Alien flows */
             if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
-                removeData(flowIdent, Integer.valueOf(5));
+                removeData(flowIdent, REMOVE_AFTER_MISSING_COLLECTION);
             }
         }
 
         /* Build and deploy new FlowHashId map */
         private void updateHashCache(final ReadWriteTransaction trans, final FlowKey flowKey, final FlowHashIdMapKey hashingKey) {
-            // TODO Auto-generated method stub
             final FlowHashIdMapBuilder flHashIdMap = new FlowHashIdMapBuilder();
             flHashIdMap.setFlowId(flowKey.getId());
             flHashIdMap.setKey(hashingKey);
@@ -407,44 +429,42 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         }
 
         void removeUnreportedFlows(final ReadWriteTransaction tx) {
+            final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
+            final List<InstanceIdentifier<Flow>> listMissingConfigFlows = notStatReportedConfigFlows();
+            final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
             final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
-            final Optional<Table> configTable = readLatestConfiguration(tableRef);
-            List<Flow> configFlows = Collections.emptyList();
-            if (configTable.isPresent() && configTable.get().getFlow() != null) {
-                configFlows = new ArrayList<>(configTable.get().getFlow());
-            }
             for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
                 final FlowKey flowKey = new FlowKey(entryForRemove.getValue());
                 final InstanceIdentifier<Flow> flowRef = tableRef.child(Flow.class, flowKey);
-                final InstanceIdentifier<FlowStatisticsData> flowStatIdent = flowRef.augmentation(FlowStatisticsData.class);
-                if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
-                    final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
-                    final Integer lifeIndex = mapNodesForDelete.get(nodeIdent).remove(flowRef);
+                if (nodeDeleteMap != null && flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
+                    final Integer lifeIndex = nodeDeleteMap.get(flowRef);
                     if (lifeIndex > 0) {
-                        mapNodesForDelete.get(nodeIdent).put(flowRef, Integer.valueOf(lifeIndex.intValue() - 1));
                         break;
+                    } else {
+                        nodeDeleteMap.remove(flowRef);
                     }
                 } else {
-                    if (configFlows.remove(flowRef)) {
-                        /* Node is still presented in Config/DataStore - probably lost some multipart msg */
-                        break;
+                    if (listMissingConfigFlows.remove(flowRef)) {
+                        break; // we probably lost some multipart msg
                     }
                 }
-                final Optional<FlowStatisticsData> flowStatNodeCheck;
-                try {
-                    flowStatNodeCheck = tx.read(LogicalDatastoreType.OPERATIONAL, flowStatIdent).checkedGet();
-                }
-                catch (final ReadFailedException e) {
-                    LOG.debug("Read FlowStatistics {} in Operational/DS fail! Statisticscan not beupdated.", flowStatIdent, e);
-                    break;
-                }
-                if (flowStatNodeCheck.isPresent()) {
-                    /* Node isn't new and it has not been removed yet */
-                    final InstanceIdentifier<FlowHashIdMap> flHashIdent = tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
-                    tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
-                    tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
+                final InstanceIdentifier<FlowHashIdMap> flHashIdent =
+                        tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
+                tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
+                tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
+            }
+        }
+
+        List<InstanceIdentifier<Flow>> notStatReportedConfigFlows() {
+            if (configFlows != null) {
+                final List<InstanceIdentifier<Flow>> returnList = new ArrayList<>(configFlows.size());
+                for (final Flow confFlow : configFlows) {
+                    final InstanceIdentifier<Flow> confFlowIdent = tableRef.child(Flow.class, confFlow.getKey());
+                    returnList.add(confFlowIdent);
                 }
+                return returnList;
             }
+            return Collections.emptyList();
         }
     }
 }