Bug-3085:StatisticsManager drops nodeadded operations when exceptions occur 60/19360/6
authorVaclav Demcak <vdemcak@cisco.com>
Mon, 25 May 2015 08:56:54 +0000 (10:56 +0200)
committerVaclav Demcak <vdemcak@cisco.com>
Mon, 25 May 2015 08:56:54 +0000 (10:56 +0200)
Currently nodeAdded and all stat notifications are fed to a queue as datastore operations and submitted in batches of 100.

If one of the tx chain submit fails the node added operation also gets discarded, along with the rest.

Queuing node added operations along with rest of the stat notifications causes stats for the new nodes to get collected quite late, as the number of devices increase.

Also when a node gets removed, the datastore operations that are queued for the removed node get submitted which might clash with the inventory-manager’s transaction.

So this patch ignores operations queued up for removed node.

Also it ignores for the stale stat notification operations which are collected for a node which got reconnected. This is done by marking the stat operations via UUIDs, created during node-added.

path6: rebase and fix conflicts

Change-Id: I5564627857c1834658ca0a0f2d530e129b7db953
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/StatisticsManager.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitFlow.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitGroup.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitMeter.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitQueue.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatNodeRegistrationImpl.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatNotifyCommitPort.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatNotifyCommitTable.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatisticsManagerImpl.java

index e1c3fbc1ef57b390e2614b45ed4305bc1e20d136..3edbadcc4c38d9bc82d45f554f8bc0c9922fea9a 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.openflowplugin.applications.statistics.manager;
 
 import java.util.List;
-
+import java.util.UUID;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
@@ -25,7 +25,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.q
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
@@ -73,12 +75,14 @@ public interface StatisticsManager extends AutoCloseable, TransactionChainListen
 
         private NodeId nodeId;
         private StatsManagerOperationType operationType = StatsManagerOperationType.DATA_COMMIT_OPER_DS;
+        private UUID nodeUUID;
 
         public StatDataStoreOperation(final StatsManagerOperationType operType, final NodeId id){
             if(operType != null){
                 operationType = operType;
             }
             nodeId = id;
+            nodeUUID = generatedUUIDForNode();
         }
 
         public final StatsManagerOperationType getType() {
@@ -89,6 +93,10 @@ public interface StatisticsManager extends AutoCloseable, TransactionChainListen
             return nodeId;
         }
 
+        public UUID getNodeUUID() {
+            return nodeUUID;
+        }
+
         /**
          * Apply all read / write (put|merge) operation for DataStore
          *
@@ -96,6 +104,41 @@ public interface StatisticsManager extends AutoCloseable, TransactionChainListen
          */
         public abstract void applyOperation(ReadWriteTransaction tx);
 
+        protected abstract UUID generatedUUIDForNode();
+
+        public InstanceIdentifier<Node> getNodeIdentifier() {
+            final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
+                    .child(Node.class, new NodeKey(nodeId));
+            return nodeIdent;
+        }
+
+    }
+
+
+    class Pair<L,R> {
+
+        private final L left;
+        private final R right;
+
+        public Pair(L left, R right) {
+            this.left = left;
+            this.right = right;
+        }
+
+        public L getLeft() { return left; }
+        public R getRight() { return right; }
+
+        @Override
+        public int hashCode() { return left.hashCode() ^ right.hashCode(); }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof Pair)) return false;
+            Pair pairo = (Pair) o;
+            return this.left.equals(pairo.getLeft()) &&
+                    this.right.equals(pairo.getRight());
+        }
+
     }
 
     /**
@@ -222,5 +265,12 @@ public interface StatisticsManager extends AutoCloseable, TransactionChainListen
 
     StatisticsManagerConfig getConfiguration();
 
+    /**
+     * A unique UUID is generated with each node added by the statistics manager implementation in order to uniquely
+     * identify a session.
+     * @param nodeInstanceIdentifier
+     */
+    UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier);
+
 }
 
index 59b215798ceee7308e1377a407a5d76b2342991e..71d34dca08b39ef7d024b2b7a642299231a71ad5 100644 (file)
@@ -18,6 +18,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -156,6 +157,11 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                     }
                 }
             }
+
+            @Override
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
@@ -218,6 +224,11 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                 notifyToCollectNextStatistics(nodeIdent, transId);
             }
 
+            @Override
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
+
         });
     }
 
index ed0b5531014b30fb23357e9dcbbc8ee2d1d5590f..f30984ffab4228dd7b9f5e847fab42b96f69cac5 100644 (file)
@@ -12,6 +12,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import java.util.UUID;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -140,6 +141,11 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                 /* Notification for continue collecting statistics */
                 notifyToCollectNextStatistics(nodeIdent, transId);
             }
+
+            @Override
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
@@ -194,6 +200,11 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                     }
                 }
             }
+
+            @Override
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
@@ -252,6 +263,10 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                     notifyToCollectNextStatistics(nodeIdent, transId);
                 }
             }
+
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
index 74d9040c487af118214122a2ee93f025d42b6033..e0a8ec3267b800c12115c40b66220e6759aed36c 100644 (file)
@@ -12,6 +12,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import java.util.UUID;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -140,6 +141,10 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                 /* Notification for continue collecting statistics */
                 notifyToCollectNextStatistics(nodeIdent, transId);
             }
+
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
@@ -194,6 +199,11 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                     }
                 }
             }
+
+            @Override
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
@@ -252,6 +262,11 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                     notifyToCollectNextStatistics(nodeIdent, transId);
                 }
             }
+
+            @Override
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
index 834939e6b6eaf58cf04df3223998b85a9a55784f..af8cb213f50fe8308a7b8be4aedb331c451ac060 100644 (file)
@@ -15,6 +15,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+
+import java.util.UUID;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -138,6 +140,11 @@ public class StatListenCommitQueue extends StatAbstractListenCommit<Queue, Opend
                 /* Notification for continue collecting statistics */
                 notifyToCollectNextStatistics(nodeIdent, transId);
             }
+
+            @Override
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
index c8ce46d7c89ab94cd55872fa81f161618d406b80..6795f6f339c8ffb5a2a211a4564de45a70c7a112 100644 (file)
@@ -8,23 +8,21 @@
 
 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FeatureCapability;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated;
@@ -41,16 +39,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRem
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
 /**
  * statistics-manager
  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
@@ -116,36 +110,29 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration, DataChang
         Preconditions.checkNotNull(data, "SwitchFeatures data for {} can not be null!", keyIdent);
         Preconditions.checkArgument(( ! keyIdent.isWildcarded()), "InstanceIdentifier is WildCarded!");
 
-        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_UPDATE,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
-
-            @Override
-            public void applyOperation(final ReadWriteTransaction tx) {
-
-                final List<StatCapabTypes> statCapabTypes = new ArrayList<>();
-                Short maxCapTables = Short.valueOf("1");
-
-                final List<Class<? extends FeatureCapability>> capabilities = data.getCapabilities() != null
-                        ? data.getCapabilities() : Collections.<Class<? extends FeatureCapability>> emptyList();
-                for (final Class<? extends FeatureCapability> capability : capabilities) {
-                    if (FlowFeatureCapabilityTableStats.class.equals(capability)) {
-                        statCapabTypes.add(StatCapabTypes.TABLE_STATS);
-                    } else if (FlowFeatureCapabilityFlowStats.class.equals(capability)) {
-                        statCapabTypes.add(StatCapabTypes.FLOW_STATS);
-                    } else if (FlowFeatureCapabilityGroupStats.class.equals(capability)) {
-                        statCapabTypes.add(StatCapabTypes.GROUP_STATS);
-                    } else if (FlowFeatureCapabilityPortStats.class.equals(capability)) {
-                        statCapabTypes.add(StatCapabTypes.PORT_STATS);
-                    } else if (FlowFeatureCapabilityQueueStats.class.equals(capability)) {
-                        statCapabTypes.add(StatCapabTypes.QUEUE_STATS);
-                    }
-                }
-                maxCapTables = data.getMaxTables();
-
-                final Optional<Short> maxTables = Optional.<Short> of(maxCapTables);
-                manager.connectedNodeRegistration(nodeIdent,
-                        Collections.unmodifiableList(statCapabTypes), maxTables.get());
+        LOG.trace("STAT-MANAGER - connecting flow capable node {}", nodeIdent);
+        final List<StatCapabTypes> statCapabTypes = new ArrayList<>();
+        Short maxCapTables = Short.valueOf("1");
+
+        final List<Class<? extends FeatureCapability>> capabilities = data.getCapabilities() != null
+                ? data.getCapabilities() : Collections.<Class<? extends FeatureCapability>> emptyList();
+        for (final Class<? extends FeatureCapability> capability : capabilities) {
+            if (FlowFeatureCapabilityTableStats.class.equals(capability)) {
+                statCapabTypes.add(StatCapabTypes.TABLE_STATS);
+            } else if (FlowFeatureCapabilityFlowStats.class.equals(capability)) {
+                statCapabTypes.add(StatCapabTypes.FLOW_STATS);
+            } else if (FlowFeatureCapabilityGroupStats.class.equals(capability)) {
+                statCapabTypes.add(StatCapabTypes.GROUP_STATS);
+            } else if (FlowFeatureCapabilityPortStats.class.equals(capability)) {
+                statCapabTypes.add(StatCapabTypes.PORT_STATS);
+            } else if (FlowFeatureCapabilityQueueStats.class.equals(capability)) {
+                statCapabTypes.add(StatCapabTypes.QUEUE_STATS);
             }
-        });
+        }
+        maxCapTables = data.getMaxTables();
+
+        final Optional<Short> maxTables = Optional.<Short> of(maxCapTables);
+        manager.connectedNodeRegistration(nodeIdent, Collections.unmodifiableList(statCapabTypes), maxTables.get());
     }
 
     @Override
@@ -153,13 +140,8 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration, DataChang
         Preconditions.checkArgument(nodeIdent != null, "InstanceIdentifier can not be NULL!");
         Preconditions.checkArgument(( ! nodeIdent.isWildcarded()),
                 "InstanceIdentifier {} is WildCarded!", nodeIdent);
-        manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_REMOVAL,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
-
-            @Override
-            public void applyOperation(final ReadWriteTransaction tx) {
-                manager.disconnectedNodeUnregistration(nodeIdent);
-            }
-        });
+        LOG.trace("STAT-MANAGER - disconnect flow capable node {}", nodeIdent);
+        manager.disconnectedNodeUnregistration(nodeIdent);
     }
 
 
@@ -181,6 +163,7 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration, DataChang
         final InstanceIdentifier<Node> nodeIdent =
                 nodeRefIdent.firstIdentifierOf(Node.class);
         if (nodeIdent != null) {
+            LOG.debug("Received onNodeRemoved for node:{} ", nodeIdent);
             disconnectFlowCapableNode(nodeIdent);
         }
     }
index ee7f0ef9fc5e0a91702c838c9fb723cbc6c6f8ff..61a8dc03ca149fbbdddbbaa1e3f69068ace25f64 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 import java.util.ArrayList;
 import java.util.List;
 
+import java.util.UUID;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -108,6 +109,11 @@ public class StatNotifyCommitPort extends StatAbstractNotifyCommit<OpendaylightP
                  * and they are small - don't need to wait for whole apply operation*/
                 notifyToCollectNextStatistics(nodeIdent, transId);
             }
+
+            @Override
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
index fede1abc76ae3bc5eaee2604015f9894a84aa0a3..d5bd27e23066d6ecf0797fb35caf5edb8969433e 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 import java.util.ArrayList;
 import java.util.List;
 
+import java.util.UUID;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -108,6 +109,11 @@ public class StatNotifyCommitTable extends StatAbstractNotifyCommit<Opendaylight
                  * and they are small - don't need to wait to whole apply operation */
                 notifyToCollectNextStatistics(nodeIdent, transId);
             }
+
+            @Override
+            public UUID generatedUUIDForNode() {
+                return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+            }
         });
     }
 
index 347941518d9566deb960194267071299c860247e..d4779c931fab57b7918dde77cda941ebf5075237 100644 (file)
@@ -8,15 +8,20 @@
 
 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadFactory;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -31,7 +36,6 @@ import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermC
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
@@ -48,9 +52,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 /**
 * statistics-manager
 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
@@ -73,8 +74,11 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    private static final int MAX_BATCH = 100;
 
    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
+    private final Map<InstanceIdentifier<Node>, Pair<StatPermCollector, UUID>> nodeCollectorMap = new ConcurrentHashMap<>();
+    private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
 
-   private final DataBroker dataBroker;
+
+    private final DataBroker dataBroker;
    private final ExecutorService statRpcMsgManagerExecutor;
    private final ExecutorService statDataStoreOperationServ;
    private StatRpcMsgManager rpcMsgManager;
@@ -173,9 +177,18 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
                int ops = 0;
                do {
-                   op.applyOperation(tx);
+                   Pair<StatPermCollector, UUID> statPermCollectorUUIDPair = nodeCollectorMap.get(op.getNodeIdentifier());
+                   if (statPermCollectorUUIDPair != null && statPermCollectorUUIDPair.getRight().equals(op.getNodeUUID())) {
+                       // dont apply operations for nodes which have been disconnected or if there uuids do not match
+                       // this can happen if operations are queued and node is removed.
+                       // if the uuids dont match, it means that the stat operation are stale and belong to the same node
+                       // which got disconnected and connected again.
+                       op.applyOperation(tx);
+                       ops++;
+                   } else {
+                       LOG.debug("{} not found or UUID mismatch for statistics datastore operation", op.getNodeIdentifier());
+                   }
 
-                   ops++;
                    if (ops < MAX_BATCH) {
                        op = dataStoreOperQueue.poll();
                    } else {
@@ -185,7 +198,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
 
-                   tx.submit().checkedGet();
+               tx.submit().checkedGet();
            } catch (final InterruptedException e) {
                LOG.warn("Stat Manager DS Operation thread interupted!", e);
                finishing = true;
@@ -203,19 +216,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    private synchronized void cleanDataStoreOperQueue() {
        // Drain all events, making sure any blocked threads are unblocked
        while (! dataStoreOperQueue.isEmpty()) {
-           StatDataStoreOperation op = dataStoreOperQueue.poll();
-
-           // Execute the node removal clean up operation if queued in the
-           // operational queue.
-           if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
-               try {
-                   LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
-                   op.applyOperation(null);
-               } catch (final Exception e) {
-                   LOG.warn("Unhandled exception while cleaning up internal data of node [{}]. "
-                           + "Exception {}",op.getNodeId(), e);
-               }
-           }
+           dataStoreOperQueue.poll();
        }
    }
 
@@ -249,52 +250,81 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        }
    }
 
-   @Override
-   public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
-           final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
-       for (final StatPermCollector collector : statCollectors) {
-           if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
-               return;
-           }
-       }
-       synchronized (statCollectorLock) {
-           for (final StatPermCollector collector : statCollectors) {
-               if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
-                   return;
-               }
-           }
-           final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
-                   statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
-                   statManagerConfig.getMaxNodesForCollector());
-           final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
-           newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
-           statCollectorsNew.add(newCollector);
-           statCollectors = Collections.unmodifiableList(statCollectorsNew);
-       }
-   }
+    @Override
+    public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
+            final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
+
+
+        Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
+        if (collectorUUIDPair == null) {
+            // no collector contains this node,
+            // check if one of the collectors can accommodate it
+            // if no then add a new collector
+
+            synchronized(statCollectorLock) {
+                for (int i = statCollectors.size() - 1; i >= 0; i--) {
+                    // start from back of the list as most likely previous ones might be full
+                    final StatPermCollector aCollector = statCollectors.get(i);
+                    if (aCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
+                        // if the collector returns true after adding node, then return
+                        nodeCollectorMap.put(nodeIdent, new Pair(aCollector, UUID.randomUUID()));
+                        LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}",
+                                numNodesBeingCollected.incrementAndGet());
+                        return;
+                    }
+                }
+                // no collector was able to add this node
+                LOG.info("No existing collector found for new node. Creating a new collector for {}", nodeIdent);
+                final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
+                        statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
+                        statManagerConfig.getMaxNodesForCollector());
+
+                final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
+                statCollectorsNew.add(newCollector);
+                statCollectors = Collections.unmodifiableList(statCollectorsNew);
+                nodeCollectorMap.put(nodeIdent, new Pair(newCollector, UUID.randomUUID()));
+                LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.incrementAndGet());
+
+                newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
+            }
+
+
+        } else {
+            // add to the collector, even if it rejects it.
+            collectorUUIDPair.getLeft().connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
+        }
+    }
 
-   @Override
-   public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
-       flowListeningCommiter.cleanForDisconnect(nodeIdent);
 
-       for (final StatPermCollector collector : statCollectors) {
-           if (collector.disconnectedNodeUnregistration(nodeIdent)) {
-               if ( ! collector.hasActiveNodes()) {
-                   synchronized (statCollectorLock) {
-                       if (collector.hasActiveNodes()) {
-                           return;
-                       }
-                       final List<StatPermCollector> newStatColl =
-                               new ArrayList<>(statCollectors);
-                       newStatColl.remove(collector);
-                       statCollectors = Collections.unmodifiableList(newStatColl);
-                   }
-               }
-               return;
-           }
-       }
-       LOG.debug("Node {} has not been removed.", nodeIdent);
-   }
+    @Override
+    public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
+        flowListeningCommiter.cleanForDisconnect(nodeIdent);
+
+        Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
+        StatPermCollector collector = collectorUUIDPair.getLeft();
+        if (collector != null) {
+            nodeCollectorMap.remove(nodeIdent);
+            LOG.debug("NodeRemoved: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.decrementAndGet());
+
+            if (collector.disconnectedNodeUnregistration(nodeIdent)) {
+                if (!collector.hasActiveNodes()) {
+                    synchronized (statCollectorLock) {
+                        if (collector.hasActiveNodes()) {
+                            return;
+                        }
+                        final List<StatPermCollector> newStatColl = new ArrayList<>(statCollectors);
+                        newStatColl.remove(collector);
+                        statCollectors = Collections.unmodifiableList(newStatColl);
+                    }
+                }
+                LOG.info("Node:{} successfully removed by StatisticsManager ", nodeIdent);
+            } else {
+                LOG.error("Collector not disconnecting for node, no operations will be committed for this node:{}", nodeIdent);
+            }
+        } else {
+            LOG.error("Received node removed for {}, but unable to find it in nodeCollectorMap", nodeIdent);
+        }
+    }
 
    @Override
    public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
@@ -353,5 +383,15 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
     public StatisticsManagerConfig getConfiguration() {
         return statManagerConfig;
     }
+
+    @Override
+    public UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier) {
+        Pair<StatPermCollector, UUID> permCollectorUUIDPair = nodeCollectorMap.get(nodeInstanceIdentifier);
+        if (permCollectorUUIDPair != null) {
+            return permCollectorUUIDPair.getRight();
+        }
+        // we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
+        return UUID.fromString("invalid-uuid");
+    }
 }