Merge "Bug 2551 - Statistics collection of random node fails when large number if...
authorTom Pantelis <tpanteli@brocade.com>
Mon, 5 Jan 2015 22:20:24 +0000 (22:20 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 5 Jan 2015 22:20:24 +0000 (22:20 +0000)
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsManager.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitGroup.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitMeter.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitQueue.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNodeRegistrationImpl.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitPort.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatNotifyCommitTable.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java

index 751a68965dc69af4963cf8bd1197cce9590f056d..6124bdf6422d80b3a06591af55af44b623472144 100644 (file)
@@ -24,6 +24,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
@@ -53,14 +54,47 @@ public interface StatisticsManager extends AutoCloseable, TransactionChainListen
      * Internal {@link TransactionChainListener} joining all DS commits
      * to Set of chained changes for prevent often DataStore touches.
      */
-    public interface StatDataStoreOperation {
+    public abstract class StatDataStoreOperation {
+        public enum StatsManagerOperationType {
+            /**
+             * Operation will carry out work related to new node addition /
+             * update
+             */
+            NODE_UPDATE,
+            /**
+             * Operation will carry out work related to node removal
+             */
+            NODE_REMOVAL,
+            /**
+             * Operation will commit data to the operational data store
+             */
+            DATA_COMMIT_OPER_DS
+        }
+
+        private NodeId nodeId;
+        private StatsManagerOperationType operationType = StatsManagerOperationType.DATA_COMMIT_OPER_DS;
+
+        public StatDataStoreOperation(final StatsManagerOperationType operType, final NodeId id){
+            if(operType != null){
+                operationType = operType;
+            }
+            nodeId = id;
+        }
+
+        public final StatsManagerOperationType getType() {
+            return operationType;
+        }
+
+        public final NodeId getNodeId(){
+            return nodeId;
+        }
 
         /**
-         * Apply all read / write (put|merge) operation
-         * for DataStore
+         * Apply all read / write (put|merge) operation for DataStore
+         *
          * @param {@link ReadWriteTransaction} tx
          */
-        void applyOperation(ReadWriteTransaction tx);
+        public abstract void applyOperation(ReadWriteTransaction tx);
 
     }
 
index e17c45dc767f5050c4113c4df96b2c6aaf170232..49fe3bbefd25cbb6e4c30646e8e23c67d904532a 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.md.statistics.manager.impl.helper.FlowComparator;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -120,7 +121,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             return;
         }
         /* check flow Capable Node and write statistics */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
@@ -179,7 +180,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             return;
         }
         /* add flow's statistics */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
@@ -218,6 +219,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                 /* Notification for continue collecting statistics */
                 notifyToCollectNextStatistics(nodeIdent, transId);
             }
+
         });
     }
 
index 944ccfab5fd4433c37665037101d86e988a9b56d..538b9ef50565b8330e0960c0b192019fe9db959d 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatC
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
@@ -103,7 +104,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
@@ -157,7 +158,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 /* Get and Validate TransactionCacheContainer */
@@ -211,7 +212,7 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
index 2d5be71dcf914e41f0d8b9d61db9d5d7429ad67d..77d51c364adeeaf656275c1287757433095e9a6c 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatC
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
@@ -101,7 +102,7 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
@@ -157,7 +158,7 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 /* Get and Validate TransactionCacheContainer */
@@ -211,7 +212,7 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
index 77d7c7d312345a74451fc45c4c1f972ed23fd9c0..1bff3deba563d006e0890a16180cfb1b5602ec82 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
@@ -95,7 +96,7 @@ public class StatListenCommitQueue extends StatAbstractListenCommit<Queue, Opend
         }
 
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
index afb45e59f0a6c6effd7856e02bfbdc3e10a47116..4169725f4aec5256c5e3e6613b18cb0b2cb6b9ae 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FeatureCapability;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -40,6 +41,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRem
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -114,7 +116,8 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration, DataChang
         Preconditions.checkNotNull(data, "SwitchFeatures data for {} can not be null!", keyIdent);
         Preconditions.checkArgument(( ! keyIdent.isWildcarded()), "InstanceIdentifier is WildCarded!");
 
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_UPDATE,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
 
@@ -150,7 +153,8 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration, DataChang
         Preconditions.checkArgument(nodeIdent != null, "InstanceIdentifier can not be NULL!");
         Preconditions.checkArgument(( ! nodeIdent.isWildcarded()),
                 "InstanceIdentifier {} is WildCarded!", nodeIdent);
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_REMOVAL,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
             @Override
             public void applyOperation(final ReadWriteTransaction tx) {
                 manager.disconnectedNodeUnregistration(nodeIdent);
index 131de73f9ded30b75c1d039a9a9d5a9ab22def47..65b5df0d6107b69b3be2d07fd7a8358b5ca0ee2e 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
@@ -82,7 +83,7 @@ public class StatNotifyCommitPort extends StatAbstractNotifyCommit<OpendaylightP
         final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
                 .child(Node.class, new NodeKey(nodeId));
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction trans) {
                 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
index 53bca87034e263bf0b4b73eb07bb6579dbf63043..2d730645ace5b09089d68bd14c29e774af25a628 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
@@ -81,7 +82,7 @@ public class StatNotifyCommitTable extends StatAbstractNotifyCommit<Opendaylight
             return;
         }
         /* Don't block RPC Notification thread */
-        manager.enqueue(new StatDataStoreOperation() {
+        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
             @Override
             public void applyOperation(final ReadWriteTransaction trans) {
                 final List<FlowTableAndStatisticsMap> tableStats = new ArrayList<FlowTableAndStatisticsMap>(10);
index 1d03e38c165c0a51e3a257730792389abd4d42a5..437c92f6a09e27d1646a893c644bf688c0ad4af7 100644 (file)
@@ -29,6 +29,7 @@ import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
 import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
@@ -223,7 +224,18 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    private synchronized void cleanDataStoreOperQueue() {
        // Drain all events, making sure any blocked threads are unblocked
        while (! dataStoreOperQueue.isEmpty()) {
-           dataStoreOperQueue.poll();
+           StatDataStoreOperation op = dataStoreOperQueue.poll();
+
+           // Execute the node removal clean up operation if queued in the
+           // operational queue.
+           if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
+               try {
+                   LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
+                   op.applyOperation(null);
+               } catch (final Exception ex) {
+                   LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId());
+               }
+           }
        }
    }