BUG-2078 Stats not being collected from all nodes, dangling nodes left in oper data... 64/11664/4
authorVaclav Demcak <vdemcak@cisco.com>
Mon, 29 Sep 2014 20:47:24 +0000 (22:47 +0200)
committerVaclav Demcak <vdemcak@cisco.com>
Mon, 29 Sep 2014 23:08:00 +0000 (01:08 +0200)
BUG-2049  DataStore failure in StatisticsManager

* fix performance issues
* fix occurrence zombie nodes
* fix NPE for StatCollector
* fix NPE for Meter/Group lists

Change-Id: I95a821aaf308bdb6e989c4a740aa014d5aaab4c2
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManagerActivator.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitGroup.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitMeter.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitQueue.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitPort.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitTable.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatRpcMsgManagerImpl.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java

index 912a6eda4bf7999e9c465fc1ad196b04d744d10b..c505af49e6d20ec38023aee94c1361feb0a6dc0d 100644 (file)
@@ -31,7 +31,7 @@ public class StatisticsManagerActivator extends AbstractBindingAwareProvider {
 
     /* TODO move it to ConfigSubsystem */
     private static final long DEFAULT_MIN_REQUEST_NET_MONITOR_INTERVAL = 3000L;
-    private static final int MAX_NODES_FOR_COLLECTOR = 8;
+    private static final int MAX_NODES_FOR_COLLECTOR = 16;
 
     private StatisticsManager statsProvider;
 
index a19081db2527f7e2462633a783d8fcbab0e60b50..230425999e3215045ac632a4d291e8cd5507c943 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.no
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
@@ -139,8 +140,9 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                             .setAggregateFlowStatistics(new AggregateFlowStatisticsBuilder(notification).build()).build();
                         final InstanceIdentifier<FlowCapableNode> fNodeIdent = InstanceIdentifier.create(Nodes.class)
                                 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
-                        final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = fNodeIdent
-                                .child(Table.class, table.getKey()).augmentation(AggregateFlowStatisticsData.class);
+                        final InstanceIdentifier<Table> tableRef = fNodeIdent.child(Table.class, table.getKey());
+                        final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = tableRef
+                                .augmentation(AggregateFlowStatisticsData.class);
                         Optional<FlowCapableNode> fNode = Optional.absent();
                         try {
                             fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
@@ -149,7 +151,8 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                             return;
                         }
                         if (fNode.isPresent()) {
-                            tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats, true);
+                            ensureTable(tx, table.getId(), tableRef);
+                            tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats);
                         }
                     }
                 }
@@ -157,6 +160,11 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         });
     }
 
+    public void ensureTable(final ReadWriteTransaction tx, final Short tableId, final InstanceIdentifier<Table> tableRef) {
+        final Table tableNew = new TableBuilder().setId(tableId).build();
+        tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef, tableNew);
+    }
+
     @Override
     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
         final TransactionId transId = notification.getTransactionId();
@@ -327,11 +335,12 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             }
         }
 
-        private void ensureTable(final ReadWriteTransaction tx) {
+        private void ensureTableFowHashIdMapping(final ReadWriteTransaction tx) {
             if( ! tableEnsured) {
+                ensureTable(tx, tableKey.getId(), tableRef);
                 final FlowHashIdMapping emptyMapping = new FlowHashIdMappingBuilder()
                     .setFlowHashIdMap(Collections.<FlowHashIdMap> emptyList()).build();
-                tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping, true);
+                tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping);
                 tableEnsured = true;
             }
         }
@@ -387,7 +396,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         }
 
         void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
-            ensureTable(trans);
+            ensureTableFowHashIdMapping(trans);
             final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
             FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
             if (flowKey == null) {
@@ -421,6 +430,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
 
         void removeUnreportedFlows(final ReadWriteTransaction tx) {
             final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
+            final List<InstanceIdentifier<Flow>> listMissingConfigFlows = notStatReportedConfigFlows();
             final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
             final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
             for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
@@ -433,6 +443,10 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                     } else {
                         nodeDeleteMap.remove(flowRef);
                     }
+                } else {
+                    if (listMissingConfigFlows.remove(flowRef)) {
+                        break; // we probably lost some multipart msg
+                    }
                 }
                 final InstanceIdentifier<FlowHashIdMap> flHashIdent =
                         tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
@@ -440,6 +454,18 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                 tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
             }
         }
+
+        List<InstanceIdentifier<Flow>> notStatReportedConfigFlows() {
+            if (configFlows != null) {
+                final List<InstanceIdentifier<Flow>> returnList = new ArrayList<>(configFlows.size());
+                for (final Flow confFlow : configFlows) {
+                    final InstanceIdentifier<Flow> confFlowIdent = tableRef.child(Flow.class, confFlow.getKey());
+                    returnList.add(confFlowIdent);
+                }
+                return returnList;
+            }
+            return Collections.emptyList();
+        }
     }
 }
 
index 41d97f080a4f9f001782298bb4795b835c1c473d..f351132f7f816bcc98151e9e6cb991c7ea911310 100644 (file)
@@ -155,7 +155,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                     LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e);
                 }
                 if (node.isPresent()) {
-                    tx.put(LogicalDatastoreType.OPERATIONAL, groupFeatureIdent, stats, true);
+                    tx.put(LogicalDatastoreType.OPERATIONAL, groupFeatureIdent, stats);
                 }
             }
         });
@@ -225,7 +225,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                 LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
             }
             if (fNode.isPresent()) {
-                trans.put(LogicalDatastoreType.OPERATIONAL, gsIdent, stats, true);
+                trans.put(LogicalDatastoreType.OPERATIONAL, gsIdent, stats);
             }
         }
     }
@@ -271,8 +271,8 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
             LOG.trace("Read Operational/DS for FlowCapableNode fail! Node {} doesn't exist.", fNodeIdent);
             return;
         }
-        final List<Group> existGroups = fNode.get().getGroup().isEmpty()
-                ? Collections.<Group> emptyList() : fNode.get().getGroup();
+        final List<Group> existGroups = fNode.get().getGroup() != null
+                ? fNode.get().getGroup() : Collections.<Group> emptyList();
         /* Add all existed groups paths - no updated paths has to be removed */
         for (final Group group : existGroups) {
             if (deviceGroupKeys.remove(group.getKey())) {
index e2ae7637c6f35f4fe578c56d7ba755b48e7899b9..9c9de59a6ad8f53b562add5a7c8255cc2a54da19 100644 (file)
@@ -151,7 +151,7 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                     LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e);
                 }
                 if (node.isPresent()) {
-                    tx.put(LogicalDatastoreType.OPERATIONAL, meterFeatureIdent, stats, true);
+                    tx.put(LogicalDatastoreType.OPERATIONAL, meterFeatureIdent, stats);
                 }
             }
         });
@@ -211,7 +211,7 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                 LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
             }
             if (fNode.isPresent()) {
-                trans.put(LogicalDatastoreType.OPERATIONAL, msIdent, stats, true);
+                trans.put(LogicalDatastoreType.OPERATIONAL, msIdent, stats);
             }
         }
     }
@@ -259,8 +259,8 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
             LOG.trace("Read Operational/DS for FlowCapableNode fail! Node {} doesn't exist.", fNodeIdent);
             return;
         }
-        final List<Meter> existMeters = fNode.get().getMeter().isEmpty()
-                ? Collections.<Meter> emptyList() : fNode.get().getMeter();
+        final List<Meter> existMeters = fNode.get().getMeter() != null
+                ? fNode.get().getMeter() : Collections.<Meter> emptyList();
         /* Add all existed groups paths - no updated paths has to be removed */
         for (final Meter meter : existMeters) {
             if (deviceMeterKeys.remove(meter.getKey())) {
index 29fe5d8e5e248960a5e2b0367b5ff00dc8e950b4..e336f01874e25f48b2a13d1221b06147bc7c9df0 100644 (file)
@@ -142,7 +142,7 @@ public class StatListenCommitQueue extends StatAbstractListenCommit<Queue, Opend
                     .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
                     .augmentation(FlowCapableNodeConnector.class)
                     .child(Queue.class, qKey).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class);
-            trans.put(LogicalDatastoreType.OPERATIONAL, queueStatIdent, statBuild.build(), true);
+            trans.put(LogicalDatastoreType.OPERATIONAL, queueStatIdent, statBuild.build());
         }
     }
 }
index 72c10ee31655289f9fcffadc9a98116611e4cad0..fb124376b6be0875e1396559966e8c3da178a852 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
@@ -30,6 +31,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.F
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.flow.capable.node.connector.statistics.FlowCapableNodeConnectorStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -125,12 +127,13 @@ public class StatNotifyCommitPort extends StatAbstractNotifyCommit<OpendaylightP
             return;
         }
         for (final NodeConnectorStatisticsAndPortNumberMap nConnectPort : portStats) {
-            final FlowCapableNodeConnectorStatisticsData stats = new FlowCapableNodeConnectorStatisticsDataBuilder()
-                    .setFlowCapableNodeConnectorStatistics(new FlowCapableNodeConnectorStatisticsBuilder(nConnectPort).build()).build();
+            final FlowCapableNodeConnectorStatistics stats = new FlowCapableNodeConnectorStatisticsBuilder(nConnectPort).build();
             final NodeConnectorKey key = new NodeConnectorKey(nConnectPort.getNodeConnectorId());
             final InstanceIdentifier<NodeConnector> nodeConnectorIdent = nodeIdent.child(NodeConnector.class, key);
             final InstanceIdentifier<FlowCapableNodeConnectorStatisticsData> nodeConnStatIdent = nodeConnectorIdent
                     .augmentation(FlowCapableNodeConnectorStatisticsData.class);
+            final InstanceIdentifier<FlowCapableNodeConnectorStatistics> flowCapNodeConnStatIdent =
+                    nodeConnStatIdent.child(FlowCapableNodeConnectorStatistics.class);
             Optional<NodeConnector> fNodeConector;
             try {
                 fNodeConector = tx.read(LogicalDatastoreType.OPERATIONAL, nodeConnectorIdent).checkedGet();
@@ -140,7 +143,9 @@ public class StatNotifyCommitPort extends StatAbstractNotifyCommit<OpendaylightP
                 fNodeConector = Optional.absent();
             }
             if (fNodeConector.isPresent()) {
-                tx.put(LogicalDatastoreType.OPERATIONAL, nodeConnStatIdent, stats);
+                tx.merge(LogicalDatastoreType.OPERATIONAL, nodeConnectorIdent, new NodeConnectorBuilder().setId(key.getId()).build());
+                tx.merge(LogicalDatastoreType.OPERATIONAL, nodeConnStatIdent, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
+                tx.put(LogicalDatastoreType.OPERATIONAL, flowCapNodeConnStatIdent, stats);
             }
         }
     }
index 3210934f88e1b02fe3036d0159b23ec7a018f21e..b41bd4973c99d3391509384edf7a7c49fbbf95cf 100644 (file)
@@ -20,12 +20,14 @@ import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatD
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
@@ -125,13 +127,17 @@ public class StatNotifyCommitTable extends StatAbstractNotifyCommit<Opendaylight
             return;
         }
         for (final FlowTableAndStatisticsMap tableStat : tableStats) {
-            final FlowTableStatisticsData stats = new FlowTableStatisticsDataBuilder()
-                    .setFlowTableStatistics(new FlowTableStatisticsBuilder(tableStat).build()).build();
-
-            final InstanceIdentifier<FlowTableStatisticsData> tStatIdent = fNodeIdent
-                    .child(Table.class, new TableKey(tableStat.getTableId().getValue()))
+            final InstanceIdentifier<Table> tableIdent = fNodeIdent
+                    .child(Table.class, new TableKey(tableStat.getTableId().getValue()));
+            final Table table = new TableBuilder().setId(tableStat.getTableId().getValue()).build();
+            trans.merge(LogicalDatastoreType.OPERATIONAL, tableIdent, table);
+            final InstanceIdentifier<FlowTableStatisticsData> tableStatIdent = tableIdent
                     .augmentation(FlowTableStatisticsData.class);
-            trans.put(LogicalDatastoreType.OPERATIONAL, tStatIdent, stats, true);
+            trans.merge(LogicalDatastoreType.OPERATIONAL, tableStatIdent, new FlowTableStatisticsDataBuilder().build());
+
+            final FlowTableStatistics stats = new FlowTableStatisticsBuilder(tableStat).build();
+            final InstanceIdentifier<FlowTableStatistics> tStatIdent = tableStatIdent.child(FlowTableStatistics.class);
+            trans.put(LogicalDatastoreType.OPERATIONAL, tStatIdent, stats);
         }
     }
 }
index 80585548403f2ba2bf1b0e9498e26152acd6dace..e53f4941295f5a1891ad1e8dbdbf32ce2404d9b6 100644 (file)
@@ -15,10 +15,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
@@ -86,7 +84,6 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     private final long maxLifeForRequest = 50; /* 50 second */
     private final int queueCapacity = 5000;
-    private final StatisticsManager manager;
 
     private final OpendaylightGroupStatisticsService groupStatsService;
     private final OpendaylightMeterStatisticsService meterStatsService;
@@ -101,7 +98,7 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     public StatRpcMsgManagerImpl (final StatisticsManager manager,
             final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
-        this.manager = Preconditions.checkNotNull(manager, "StatisticManager can not be null!");
+        Preconditions.checkArgument(manager != null, "StatisticManager can not be null!");
         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
         groupStatsService = Preconditions.checkNotNull(
                 rpcRegistry.getRpcService(OpendaylightGroupStatisticsService.class),
@@ -322,30 +319,26 @@ public class StatRpcMsgManagerImpl implements StatRpcMsgManager {
 
     @Override
     public void getAggregateFlowStat(final NodeRef nodeRef, final TableId tableId) {
-
-        manager.enqueue(new StatDataStoreOperation() {
+        Preconditions.checkArgument(nodeRef != null, "NodeRef can not be null!");
+        Preconditions.checkArgument(tableId != null, "TableId can not be null!");
+        final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
 
             @Override
-            public void applyOperation(final ReadWriteTransaction tx) {
-                final RpcJobsQueue getAggregateFlowStat = new RpcJobsQueue() {
-                    @Override
-                    public Void call() throws Exception {
-                        final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
-                                new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-                        builder.setNode(nodeRef);
-                        builder.setTableId(tableId);
-
-                        final TableBuilder tbuilder = new TableBuilder();
-                        tbuilder.setId(tableId.getValue());
-                        tbuilder.setKey(new TableKey(tableId.getValue()));
-                        registrationRpcFutureCallBack(flowStatsService
-                                .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
-                        return null;
-                    }
-                };
-                addGetAllStatJob(getAggregateFlowStat);
+            public Void call() throws Exception {
+                final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder builder =
+                        new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
+                builder.setNode(nodeRef);
+                builder.setTableId(tableId);
+
+                final TableBuilder tbuilder = new TableBuilder();
+                tbuilder.setId(tableId.getValue());
+                tbuilder.setKey(new TableKey(tableId.getValue()));
+                registrationRpcFutureCallBack(flowStatsService
+                        .getAggregateFlowStatisticsFromFlowTableForAllFlows(builder.build()), tbuilder.build(), nodeRef);
+                return null;
             }
-        });
+        };
+        addGetAllStatJob(getAggregateFlowStat);
     }
 
     @Override
index 247703f8acb3c60bb24df0f8ffc367867e13e1bc..0f8030f6204f18242d4b269549f805d11c5ab2d2 100644 (file)
@@ -67,8 +67,8 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
    private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
 
-   private static final int QUEUE_DEPTH = 1000;
-   private static final int MAX_BATCH = 1;
+   private static final int QUEUE_DEPTH = 5000;
+   private static final int MAX_BATCH = 100;
 
    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
 
@@ -282,6 +282,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    @Override
    public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
        flowListeningCommiter.cleanForDisconnect(nodeIdent);
+
        for (final StatPermCollector collector : statCollectors) {
            if (collector.disconnectedNodeUnregistration(nodeIdent)) {
                if ( ! collector.hasActiveNodes()) {