When statistics manager is collecting stats and large number of switches disconnects from controller
,some time it causes OptimisticsLockFailedException while processing the existing multipart reply
present in dataStoreOperQueue. Exception occures because it tries to write data to the Node that
doesn't existing in the data store. While handling OptimisticsLockFailedException, statistics manager
flush existing data store related operation task from dataStoreOperQueue. This queue also holds Operational
task for cleaning up internal data of the disconnected nodes and because it flushes the queue, those
operational task don't get executed and statistics manager assumes that those nodes are yet connected
to the controller. When all the switches connects back with different mode (e.g previsouly Openflow13
but reconnects in OpenFlow10 mode), statistics collection of those node fails.
Change-Id: I50a44dfd20f90e3179bcd15bde67247da6565af7
Signed-off-by: Anil Vishnoi <vishnoianil@gmail.com>
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
* Internal {@link TransactionChainListener} joining all DS commits
* to Set of chained changes for prevent often DataStore touches.
*/
- public interface StatDataStoreOperation {
+ public abstract class StatDataStoreOperation {
+ public enum StatsManagerOperationType {
+ /**
+ * Operation will carry out work related to new node addition /
+ * update
+ */
+ NODE_UPDATE,
+ /**
+ * Operation will carry out work related to node removal
+ */
+ NODE_REMOVAL,
+ /**
+ * Operation will commit data to the operational data store
+ */
+ DATA_COMMIT_OPER_DS
+ }
+
+ private NodeId nodeId;
+ private StatsManagerOperationType operationType = StatsManagerOperationType.DATA_COMMIT_OPER_DS;
+
+ public StatDataStoreOperation(final StatsManagerOperationType operType, final NodeId id){
+ if(operType != null){
+ operationType = operType;
+ }
+ nodeId = id;
+ }
+
+ public final StatsManagerOperationType getType() {
+ return operationType;
+ }
+
+ public final NodeId getNodeId(){
+ return nodeId;
+ }
/**
- * Apply all read / write (put|merge) operation
- * for DataStore
+ * Apply all read / write (put|merge) operation for DataStore
+ *
* @param {@link ReadWriteTransaction} tx
*/
- void applyOperation(ReadWriteTransaction tx);
+ public abstract void applyOperation(ReadWriteTransaction tx);
}
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.md.statistics.manager.impl.helper.FlowComparator;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
return;
}
/* check flow Capable Node and write statistics */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
return;
}
/* add flow's statistics */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
/* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent, transId);
}
+
});
}
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
/* Get and Validate TransactionCacheContainer */
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
/* Get and Validate TransactionCacheContainer */
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FeatureCapability;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
Preconditions.checkNotNull(data, "SwitchFeatures data for {} can not be null!", keyIdent);
Preconditions.checkArgument(( ! keyIdent.isWildcarded()), "InstanceIdentifier is WildCarded!");
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_UPDATE,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
@Override
public void applyOperation(final ReadWriteTransaction tx) {
Preconditions.checkArgument(nodeIdent != null, "InstanceIdentifier can not be NULL!");
Preconditions.checkArgument(( ! nodeIdent.isWildcarded()),
"InstanceIdentifier {} is WildCarded!", nodeIdent);
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_REMOVAL,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
@Override
public void applyOperation(final ReadWriteTransaction tx) {
manager.disconnectedNodeUnregistration(nodeIdent);
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(nodeId));
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction trans) {
final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
return;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction trans) {
final List<FlowTableAndStatisticsMap> tableStats = new ArrayList<FlowTableAndStatisticsMap>(10);
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
private synchronized void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (! dataStoreOperQueue.isEmpty()) {
- dataStoreOperQueue.poll();
+ StatDataStoreOperation op = dataStoreOperQueue.poll();
+
+ // Execute the node removal clean up operation if queued in the
+ // operational queue.
+ if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
+ try {
+ LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
+ op.applyOperation(null);
+ } catch (final Exception ex) {
+ LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId());
+ }
+ }
}
}