Currently nodeAdded and all stat notifications are fed to a queue as datastore operations and submitted in batches of 100.
If one of the tx chain submit fails the node added operation also gets discarded, along with the rest.
Queuing node added operations along with rest of the stat notifications causes stats for the new nodes to get collected quite late, as the number of devices increase.
Also when a node gets removed, the datastore operations that are queued for the removed node get submitted which might clash with the inventory-manager’s transaction.
So this patch ignores operations queued up for removed node.
Also it ignores for the stale stat notification operations which are collected for a node which got reconnected. This is done by marking the stat operations via UUIDs, created during node-added.
path6: rebase and fix conflicts
Change-Id: I5564627857c1834658ca0a0f2d530e129b7db953
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
package org.opendaylight.openflowplugin.applications.statistics.manager;
import java.util.List;
-
+import java.util.UUID;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
private NodeId nodeId;
private StatsManagerOperationType operationType = StatsManagerOperationType.DATA_COMMIT_OPER_DS;
+ private UUID nodeUUID;
public StatDataStoreOperation(final StatsManagerOperationType operType, final NodeId id){
if(operType != null){
operationType = operType;
}
nodeId = id;
+ nodeUUID = generatedUUIDForNode();
}
public final StatsManagerOperationType getType() {
return nodeId;
}
+ public UUID getNodeUUID() {
+ return nodeUUID;
+ }
+
/**
* Apply all read / write (put|merge) operation for DataStore
*
*/
public abstract void applyOperation(ReadWriteTransaction tx);
+ protected abstract UUID generatedUUIDForNode();
+
+ public InstanceIdentifier<Node> getNodeIdentifier() {
+ final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, new NodeKey(nodeId));
+ return nodeIdent;
+ }
+
+ }
+
+
+ class Pair<L,R> {
+
+ private final L left;
+ private final R right;
+
+ public Pair(L left, R right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ public L getLeft() { return left; }
+ public R getRight() { return right; }
+
+ @Override
+ public int hashCode() { return left.hashCode() ^ right.hashCode(); }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Pair)) return false;
+ Pair pairo = (Pair) o;
+ return this.left.equals(pairo.getLeft()) &&
+ this.right.equals(pairo.getRight());
+ }
+
}
/**
StatisticsManagerConfig getConfiguration();
+ /**
+ * A unique UUID is generated with each node added by the statistics manager implementation in order to uniquely
+ * identify a session.
+ * @param nodeInstanceIdentifier
+ */
+ UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier);
+
}
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
}
}
}
+
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
notifyToCollectNextStatistics(nodeIdent, transId);
}
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
+
});
}
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
/* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent, transId);
}
+
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
}
}
}
+
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
notifyToCollectNextStatistics(nodeIdent, transId);
}
}
+
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
/* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent, transId);
}
+
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
}
}
}
+
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
notifyToCollectNextStatistics(nodeIdent, transId);
}
}
+
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+
+import java.util.UUID;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
/* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent, transId);
}
+
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FeatureCapability;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
/**
* statistics-manager
* org.opendaylight.openflowplugin.applications.statistics.manager.impl
Preconditions.checkNotNull(data, "SwitchFeatures data for {} can not be null!", keyIdent);
Preconditions.checkArgument(( ! keyIdent.isWildcarded()), "InstanceIdentifier is WildCarded!");
- manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_UPDATE,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
-
- @Override
- public void applyOperation(final ReadWriteTransaction tx) {
-
- final List<StatCapabTypes> statCapabTypes = new ArrayList<>();
- Short maxCapTables = Short.valueOf("1");
-
- final List<Class<? extends FeatureCapability>> capabilities = data.getCapabilities() != null
- ? data.getCapabilities() : Collections.<Class<? extends FeatureCapability>> emptyList();
- for (final Class<? extends FeatureCapability> capability : capabilities) {
- if (FlowFeatureCapabilityTableStats.class.equals(capability)) {
- statCapabTypes.add(StatCapabTypes.TABLE_STATS);
- } else if (FlowFeatureCapabilityFlowStats.class.equals(capability)) {
- statCapabTypes.add(StatCapabTypes.FLOW_STATS);
- } else if (FlowFeatureCapabilityGroupStats.class.equals(capability)) {
- statCapabTypes.add(StatCapabTypes.GROUP_STATS);
- } else if (FlowFeatureCapabilityPortStats.class.equals(capability)) {
- statCapabTypes.add(StatCapabTypes.PORT_STATS);
- } else if (FlowFeatureCapabilityQueueStats.class.equals(capability)) {
- statCapabTypes.add(StatCapabTypes.QUEUE_STATS);
- }
- }
- maxCapTables = data.getMaxTables();
-
- final Optional<Short> maxTables = Optional.<Short> of(maxCapTables);
- manager.connectedNodeRegistration(nodeIdent,
- Collections.unmodifiableList(statCapabTypes), maxTables.get());
+ LOG.trace("STAT-MANAGER - connecting flow capable node {}", nodeIdent);
+ final List<StatCapabTypes> statCapabTypes = new ArrayList<>();
+ Short maxCapTables = Short.valueOf("1");
+
+ final List<Class<? extends FeatureCapability>> capabilities = data.getCapabilities() != null
+ ? data.getCapabilities() : Collections.<Class<? extends FeatureCapability>> emptyList();
+ for (final Class<? extends FeatureCapability> capability : capabilities) {
+ if (FlowFeatureCapabilityTableStats.class.equals(capability)) {
+ statCapabTypes.add(StatCapabTypes.TABLE_STATS);
+ } else if (FlowFeatureCapabilityFlowStats.class.equals(capability)) {
+ statCapabTypes.add(StatCapabTypes.FLOW_STATS);
+ } else if (FlowFeatureCapabilityGroupStats.class.equals(capability)) {
+ statCapabTypes.add(StatCapabTypes.GROUP_STATS);
+ } else if (FlowFeatureCapabilityPortStats.class.equals(capability)) {
+ statCapabTypes.add(StatCapabTypes.PORT_STATS);
+ } else if (FlowFeatureCapabilityQueueStats.class.equals(capability)) {
+ statCapabTypes.add(StatCapabTypes.QUEUE_STATS);
}
- });
+ }
+ maxCapTables = data.getMaxTables();
+
+ final Optional<Short> maxTables = Optional.<Short> of(maxCapTables);
+ manager.connectedNodeRegistration(nodeIdent, Collections.unmodifiableList(statCapabTypes), maxTables.get());
}
@Override
Preconditions.checkArgument(nodeIdent != null, "InstanceIdentifier can not be NULL!");
Preconditions.checkArgument(( ! nodeIdent.isWildcarded()),
"InstanceIdentifier {} is WildCarded!", nodeIdent);
- manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_REMOVAL,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
-
- @Override
- public void applyOperation(final ReadWriteTransaction tx) {
- manager.disconnectedNodeUnregistration(nodeIdent);
- }
- });
+ LOG.trace("STAT-MANAGER - disconnect flow capable node {}", nodeIdent);
+ manager.disconnectedNodeUnregistration(nodeIdent);
}
final InstanceIdentifier<Node> nodeIdent =
nodeRefIdent.firstIdentifierOf(Node.class);
if (nodeIdent != null) {
+ LOG.debug("Received onNodeRemoved for node:{} ", nodeIdent);
disconnectFlowCapableNode(nodeIdent);
}
}
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
* and they are small - don't need to wait for whole apply operation*/
notifyToCollectNextStatistics(nodeIdent, transId);
}
+
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
* and they are small - don't need to wait to whole apply operation */
notifyToCollectNextStatistics(nodeIdent, transId);
}
+
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
-
+import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
-import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
/**
* statistics-manager
* org.opendaylight.openflowplugin.applications.statistics.manager.impl
private static final int MAX_BATCH = 100;
private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
+ private final Map<InstanceIdentifier<Node>, Pair<StatPermCollector, UUID>> nodeCollectorMap = new ConcurrentHashMap<>();
+ private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
- private final DataBroker dataBroker;
+
+ private final DataBroker dataBroker;
private final ExecutorService statRpcMsgManagerExecutor;
private final ExecutorService statDataStoreOperationServ;
private StatRpcMsgManager rpcMsgManager;
int ops = 0;
do {
- op.applyOperation(tx);
+ Pair<StatPermCollector, UUID> statPermCollectorUUIDPair = nodeCollectorMap.get(op.getNodeIdentifier());
+ if (statPermCollectorUUIDPair != null && statPermCollectorUUIDPair.getRight().equals(op.getNodeUUID())) {
+ // dont apply operations for nodes which have been disconnected or if there uuids do not match
+ // this can happen if operations are queued and node is removed.
+ // if the uuids dont match, it means that the stat operation are stale and belong to the same node
+ // which got disconnected and connected again.
+ op.applyOperation(tx);
+ ops++;
+ } else {
+ LOG.debug("{} not found or UUID mismatch for statistics datastore operation", op.getNodeIdentifier());
+ }
- ops++;
if (ops < MAX_BATCH) {
op = dataStoreOperQueue.poll();
} else {
LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
- tx.submit().checkedGet();
+ tx.submit().checkedGet();
} catch (final InterruptedException e) {
LOG.warn("Stat Manager DS Operation thread interupted!", e);
finishing = true;
private synchronized void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (! dataStoreOperQueue.isEmpty()) {
- StatDataStoreOperation op = dataStoreOperQueue.poll();
-
- // Execute the node removal clean up operation if queued in the
- // operational queue.
- if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
- try {
- LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
- op.applyOperation(null);
- } catch (final Exception e) {
- LOG.warn("Unhandled exception while cleaning up internal data of node [{}]. "
- + "Exception {}",op.getNodeId(), e);
- }
- }
+ dataStoreOperQueue.poll();
}
}
}
}
- @Override
- public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
- final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
- for (final StatPermCollector collector : statCollectors) {
- if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
- return;
- }
- }
- synchronized (statCollectorLock) {
- for (final StatPermCollector collector : statCollectors) {
- if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
- return;
- }
- }
- final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
- statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
- statManagerConfig.getMaxNodesForCollector());
- final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
- newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
- statCollectorsNew.add(newCollector);
- statCollectors = Collections.unmodifiableList(statCollectorsNew);
- }
- }
+ @Override
+ public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
+ final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
+
+
+ Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
+ if (collectorUUIDPair == null) {
+ // no collector contains this node,
+ // check if one of the collectors can accommodate it
+ // if no then add a new collector
+
+ synchronized(statCollectorLock) {
+ for (int i = statCollectors.size() - 1; i >= 0; i--) {
+ // start from back of the list as most likely previous ones might be full
+ final StatPermCollector aCollector = statCollectors.get(i);
+ if (aCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
+ // if the collector returns true after adding node, then return
+ nodeCollectorMap.put(nodeIdent, new Pair(aCollector, UUID.randomUUID()));
+ LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}",
+ numNodesBeingCollected.incrementAndGet());
+ return;
+ }
+ }
+ // no collector was able to add this node
+ LOG.info("No existing collector found for new node. Creating a new collector for {}", nodeIdent);
+ final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
+ statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
+ statManagerConfig.getMaxNodesForCollector());
+
+ final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
+ statCollectorsNew.add(newCollector);
+ statCollectors = Collections.unmodifiableList(statCollectorsNew);
+ nodeCollectorMap.put(nodeIdent, new Pair(newCollector, UUID.randomUUID()));
+ LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.incrementAndGet());
+
+ newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
+ }
+
+
+ } else {
+ // add to the collector, even if it rejects it.
+ collectorUUIDPair.getLeft().connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
+ }
+ }
- @Override
- public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
- flowListeningCommiter.cleanForDisconnect(nodeIdent);
- for (final StatPermCollector collector : statCollectors) {
- if (collector.disconnectedNodeUnregistration(nodeIdent)) {
- if ( ! collector.hasActiveNodes()) {
- synchronized (statCollectorLock) {
- if (collector.hasActiveNodes()) {
- return;
- }
- final List<StatPermCollector> newStatColl =
- new ArrayList<>(statCollectors);
- newStatColl.remove(collector);
- statCollectors = Collections.unmodifiableList(newStatColl);
- }
- }
- return;
- }
- }
- LOG.debug("Node {} has not been removed.", nodeIdent);
- }
+ @Override
+ public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
+ flowListeningCommiter.cleanForDisconnect(nodeIdent);
+
+ Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
+ StatPermCollector collector = collectorUUIDPair.getLeft();
+ if (collector != null) {
+ nodeCollectorMap.remove(nodeIdent);
+ LOG.debug("NodeRemoved: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.decrementAndGet());
+
+ if (collector.disconnectedNodeUnregistration(nodeIdent)) {
+ if (!collector.hasActiveNodes()) {
+ synchronized (statCollectorLock) {
+ if (collector.hasActiveNodes()) {
+ return;
+ }
+ final List<StatPermCollector> newStatColl = new ArrayList<>(statCollectors);
+ newStatColl.remove(collector);
+ statCollectors = Collections.unmodifiableList(newStatColl);
+ }
+ }
+ LOG.info("Node:{} successfully removed by StatisticsManager ", nodeIdent);
+ } else {
+ LOG.error("Collector not disconnecting for node, no operations will be committed for this node:{}", nodeIdent);
+ }
+ } else {
+ LOG.error("Received node removed for {}, but unable to find it in nodeCollectorMap", nodeIdent);
+ }
+ }
@Override
public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
public StatisticsManagerConfig getConfiguration() {
return statManagerConfig;
}
+
+ @Override
+ public UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier) {
+ Pair<StatPermCollector, UUID> permCollectorUUIDPair = nodeCollectorMap.get(nodeInstanceIdentifier);
+ if (permCollectorUUIDPair != null) {
+ return permCollectorUUIDPair.getRight();
+ }
+ // we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
+ return UUID.fromString("invalid-uuid");
+ }
}