Fix bug 2450 - Statistics collection slow - performance
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatListenCommitFlow.java
index a19081db2527f7e2462633a783d8fcbab0e60b50..e17c45dc767f5050c4113c4df96b2c6aaf170232 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.no
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
@@ -139,8 +140,9 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                             .setAggregateFlowStatistics(new AggregateFlowStatisticsBuilder(notification).build()).build();
                         final InstanceIdentifier<FlowCapableNode> fNodeIdent = InstanceIdentifier.create(Nodes.class)
                                 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
-                        final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = fNodeIdent
-                                .child(Table.class, table.getKey()).augmentation(AggregateFlowStatisticsData.class);
+                        final InstanceIdentifier<Table> tableRef = fNodeIdent.child(Table.class, table.getKey());
+                        final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = tableRef
+                                .augmentation(AggregateFlowStatisticsData.class);
                         Optional<FlowCapableNode> fNode = Optional.absent();
                         try {
                             fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
@@ -149,7 +151,8 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                             return;
                         }
                         if (fNode.isPresent()) {
-                            tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats, true);
+                            ensureTable(tx, table.getId(), tableRef);
+                            tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats);
                         }
                     }
                 }
@@ -157,6 +160,11 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         });
     }
 
+    public void ensureTable(final ReadWriteTransaction tx, final Short tableId, final InstanceIdentifier<Table> tableRef) {
+        final Table tableNew = new TableBuilder().setId(tableId).build();
+        tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef, tableNew);
+    }
+
     @Override
     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
         final TransactionId transId = notification.getTransactionId();
@@ -208,7 +216,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                     }
                 }
                 /* Notification for continue collecting statistics */
-                notifyToCollectNextStatistics(nodeIdent);
+                notifyToCollectNextStatistics(nodeIdent, transId);
             }
         });
     }
@@ -327,11 +335,12 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             }
         }
 
-        private void ensureTable(final ReadWriteTransaction tx) {
+        private void ensureTableFowHashIdMapping(final ReadWriteTransaction tx) {
             if( ! tableEnsured) {
+                ensureTable(tx, tableKey.getId(), tableRef);
                 final FlowHashIdMapping emptyMapping = new FlowHashIdMappingBuilder()
                     .setFlowHashIdMap(Collections.<FlowHashIdMap> emptyList()).build();
-                tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping, true);
+                tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping);
                 tableEnsured = true;
             }
         }
@@ -387,7 +396,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         }
 
         void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
-            ensureTable(trans);
+            ensureTableFowHashIdMapping(trans);
             final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
             FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
             if (flowKey == null) {
@@ -421,6 +430,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
 
         void removeUnreportedFlows(final ReadWriteTransaction tx) {
             final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
+            final List<InstanceIdentifier<Flow>> listMissingConfigFlows = notStatReportedConfigFlows();
             final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
             final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
             for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
@@ -433,6 +443,10 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                     } else {
                         nodeDeleteMap.remove(flowRef);
                     }
+                } else {
+                    if (listMissingConfigFlows.remove(flowRef)) {
+                        break; // we probably lost some multipart msg
+                    }
                 }
                 final InstanceIdentifier<FlowHashIdMap> flHashIdent =
                         tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
@@ -440,6 +454,18 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                 tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
             }
         }
+
+        List<InstanceIdentifier<Flow>> notStatReportedConfigFlows() {
+            if (configFlows != null) {
+                final List<InstanceIdentifier<Flow>> returnList = new ArrayList<>(configFlows.size());
+                for (final Flow confFlow : configFlows) {
+                    final InstanceIdentifier<Flow> confFlowIdent = tableRef.child(Flow.class, confFlow.getKey());
+                    returnList.add(confFlowIdent);
+                }
+                return returnList;
+            }
+            return Collections.emptyList();
+        }
     }
 }