package org.opendaylight.controller.md.statistics.manager.impl;
-import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
/**
* statistics-manager
LOG.debug("STAT-MANAGER - QueueStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
return;
}
+ manager.getRpcMsgManager().addNotification(notification, nodeId);
if (notification.isMoreReplies()) {
- manager.getRpcMsgManager().addNotification(notification, nodeId);
return;
}
- final List<QueueIdAndStatisticsMap> queueStats = notification.getQueueIdAndStatisticsMap() != null
- ? new ArrayList<>(notification.getQueueIdAndStatisticsMap()) : new ArrayList<QueueIdAndStatisticsMap>(10);
- final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
- if (txContainer.isPresent()) {
- final List<? extends TransactionAware> cachedNotifs =
- txContainer.get().getNotifications();
- for (final TransactionAware notif : cachedNotifs) {
- if (notif instanceof QueueStatisticsUpdate) {
- queueStats.addAll(((QueueStatisticsUpdate) notif).getQueueIdAndStatisticsMap());
- }
- }
- }
- final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, new NodeKey(nodeId));
- /* Queue statistics are small size and we are not able to change for OF cross controller
- * - don't need to make are atomic */
+
+ /* Don't block RPC Notification thread */
manager.enqueue(new StatDataStoreOperation() {
@Override
- public void applyOperation(final ReadWriteTransaction trans) {
- /* Notification for continue */
+ public void applyOperation(final ReadWriteTransaction tx) {
+
+ final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, new NodeKey(nodeId));
+
+ /* Validate exist Node */
+ Optional<Node> fNode = Optional.absent();
+ try {
+ fNode = tx.read(LogicalDatastoreType.OPERATIONAL, nodeIdent).checkedGet();
+ }
+ catch (final ReadFailedException e) {
+ LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e);
+ }
+ if ( ! fNode.isPresent()) {
+ LOG.trace("Read Operational/DS for Node fail! Node {} doesn't exist.", nodeIdent);
+ return;
+ }
+
+ /* Get and Validate TransactionCacheContainer */
+ final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
+ if ( ! isTransactionCacheContainerValid(txContainer)) {
+ return;
+ }
+ /* Prepare List actual Queues and not updated Queues will be removed */
+ final List<NodeConnector> existConnectors = fNode.get().getNodeConnector() != null
+ ? fNode.get().getNodeConnector() : Collections.<NodeConnector> emptyList();
+ final Map<QueueKey, NodeConnectorKey> existQueueKeys = new HashMap<>();
+ for (final NodeConnector connect : existConnectors) {
+ final List<Queue> listQueues = connect.getAugmentation(FlowCapableNodeConnector.class).getQueue();
+ if (listQueues != null) {
+ for (final Queue queue : listQueues) {
+ existQueueKeys.put(queue.getKey(), connect.getKey());
+ }
+ }
+ }
+ /* Queue processing */
+ statQueueCommit(txContainer, tx, nodeIdent, existQueueKeys);
+ /* Delete all not presented Group Nodes */
+ deleteAllNotPresentedNodes(nodeIdent, tx, Collections.unmodifiableMap(existQueueKeys));
+ /* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent);
- statQueueCommit(queueStats, nodeIdent, trans);
}
});
}
- private void statQueueCommit(final List<QueueIdAndStatisticsMap> queueStats,
- final InstanceIdentifier<Node> nodeIdent, final ReadWriteTransaction trans) {
+ private void statQueueCommit(
+ final Optional<TransactionCacheContainer<?>> txContainer, final ReadWriteTransaction tx,
+ final InstanceIdentifier<Node> nodeIdent, final Map<QueueKey, NodeConnectorKey> existQueueKeys) {
- /* check exist FlowCapableNode and write statistics */
- Optional<Node> fNode = Optional.absent();
- try {
- fNode = trans.read(LogicalDatastoreType.OPERATIONAL, nodeIdent).checkedGet();
- }
- catch (final ReadFailedException e) {
- LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e);
- return;
+ Preconditions.checkNotNull(existQueueKeys);
+ Preconditions.checkNotNull(txContainer);
+ Preconditions.checkNotNull(nodeIdent);
+ Preconditions.checkNotNull(tx);
+
+ final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
+ for (final TransactionAware notif : cacheNotifs) {
+ if ( ! (notif instanceof QueueStatisticsUpdate)) {
+ break;
+ }
+ final List<QueueIdAndStatisticsMap> queueStats = ((QueueStatisticsUpdate) notif).getQueueIdAndStatisticsMap();
+ if (queueStats == null) {
+ break;
+ }
+ for (final QueueIdAndStatisticsMap queueStat : queueStats) {
+ if (queueStat.getQueueId() != null) {
+ final FlowCapableNodeConnectorQueueStatistics statChild =
+ new FlowCapableNodeConnectorQueueStatisticsBuilder(queueStat).build();
+ final FlowCapableNodeConnectorQueueStatisticsDataBuilder statBuild =
+ new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
+ statBuild.setFlowCapableNodeConnectorQueueStatistics(statChild);
+ final QueueKey qKey = new QueueKey(queueStat.getQueueId());
+ final InstanceIdentifier<FlowCapableNodeConnectorQueueStatisticsData> queueStatIdent = nodeIdent
+ .child(NodeConnector.class, new NodeConnectorKey(queueStat.getNodeConnectorId()))
+ .augmentation(FlowCapableNodeConnector.class)
+ .child(Queue.class, qKey).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class);
+ existQueueKeys.remove(qKey);
+ tx.put(LogicalDatastoreType.OPERATIONAL, queueStatIdent, statBuild.build());
+ }
+ }
}
- if ( ! fNode.isPresent()) {
- LOG.trace("Read Operational/DS for Node fail! Node {} doesn't exist.", nodeIdent);
+ }
+
+ private void deleteAllNotPresentedNodes(final InstanceIdentifier<Node> nodeIdent,
+ final ReadWriteTransaction tx, final Map<QueueKey, NodeConnectorKey> existQueueKeys) {
+
+ Preconditions.checkNotNull(nodeIdent);
+ Preconditions.checkNotNull(tx);
+
+ if (existQueueKeys == null) {
return;
}
- for (final QueueIdAndStatisticsMap queueEntry : queueStats) {
- final FlowCapableNodeConnectorQueueStatistics statChild =
- new FlowCapableNodeConnectorQueueStatisticsBuilder(queueEntry).build();
- final FlowCapableNodeConnectorQueueStatisticsDataBuilder statBuild =
- new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
- statBuild.setFlowCapableNodeConnectorQueueStatistics(statChild);
- final QueueKey qKey = new QueueKey(queueEntry.getQueueId());
- final InstanceIdentifier<FlowCapableNodeConnectorQueueStatisticsData> queueStatIdent = nodeIdent
- .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId()))
- .augmentation(FlowCapableNodeConnector.class)
- .child(Queue.class, qKey).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class);
- trans.put(LogicalDatastoreType.OPERATIONAL, queueStatIdent, statBuild.build());
+ for (final Entry<QueueKey, NodeConnectorKey> entry : existQueueKeys.entrySet()) {
+ final InstanceIdentifier<Queue> queueIdent = nodeIdent.child(NodeConnector.class, entry.getValue())
+ .augmentation(FlowCapableNodeConnector.class).child(Queue.class, entry.getKey());
+ LOG.trace("Queue {} has to removed.", queueIdent);
+ Optional<Queue> delQueue = Optional.absent();
+ try {
+ delQueue = tx.read(LogicalDatastoreType.OPERATIONAL, queueIdent).checkedGet();
+ }
+ catch (final ReadFailedException e) {
+ // NOOP - probably another transaction delete that node
+ }
+ if (delQueue.isPresent()) {
+ tx.delete(LogicalDatastoreType.OPERATIONAL, queueIdent);
+ }
}
}
}