import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
* Internal {@link TransactionChainListener} joining all DS commits
* to Set of chained changes for prevent often DataStore touches.
*/
- public interface StatDataStoreOperation {
+ public abstract class StatDataStoreOperation {
+ public enum StatsManagerOperationType {
+ /**
+ * Operation will carry out work related to new node addition /
+ * update
+ */
+ NODE_UPDATE,
+ /**
+ * Operation will carry out work related to node removal
+ */
+ NODE_REMOVAL,
+ /**
+ * Operation will commit data to the operational data store
+ */
+ DATA_COMMIT_OPER_DS
+ }
+
+ private NodeId nodeId;
+ private StatsManagerOperationType operationType = StatsManagerOperationType.DATA_COMMIT_OPER_DS;
+
+ public StatDataStoreOperation(final StatsManagerOperationType operType, final NodeId id){
+ if(operType != null){
+ operationType = operType;
+ }
+ nodeId = id;
+ }
+
+ public final StatsManagerOperationType getType() {
+ return operationType;
+ }
+
+ public final NodeId getNodeId(){
+ return nodeId;
+ }
/**
- * Apply all read / write (put|merge) operation
- * for DataStore
+ * Apply all read / write (put|merge) operation for DataStore
+ *
* @param {@link ReadWriteTransaction} tx
*/
- void applyOperation(ReadWriteTransaction tx);
+ public abstract void applyOperation(ReadWriteTransaction tx);
}
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.md.statistics.manager.impl.helper.FlowComparator;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
return;
}
/* check flow Capable Node and write statistics */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
return;
}
/* add flow's statistics */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
/* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent, transId);
}
+
});
}
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
/* Get and Validate TransactionCacheContainer */
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
/* Get and Validate TransactionCacheContainer */
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FeatureCapability;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
Preconditions.checkNotNull(data, "SwitchFeatures data for {} can not be null!", keyIdent);
Preconditions.checkArgument(( ! keyIdent.isWildcarded()), "InstanceIdentifier is WildCarded!");
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_UPDATE,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
@Override
public void applyOperation(final ReadWriteTransaction tx) {
Preconditions.checkArgument(nodeIdent != null, "InstanceIdentifier can not be NULL!");
Preconditions.checkArgument(( ! nodeIdent.isWildcarded()),
"InstanceIdentifier {} is WildCarded!", nodeIdent);
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_REMOVAL,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
@Override
public void applyOperation(final ReadWriteTransaction tx) {
manager.disconnectedNodeUnregistration(nodeIdent);
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(nodeId));
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction trans) {
final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
return;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction trans) {
final List<FlowTableAndStatisticsMap> tableStats = new ArrayList<FlowTableAndStatisticsMap>(10);
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
private synchronized void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (! dataStoreOperQueue.isEmpty()) {
- dataStoreOperQueue.poll();
+ StatDataStoreOperation op = dataStoreOperQueue.poll();
+
+ // Execute the node removal clean up operation if queued in the
+ // operational queue.
+ if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
+ try {
+ LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
+ op.applyOperation(null);
+ } catch (final Exception ex) {
+ LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId());
+ }
+ }
}
}