X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2Fimpl%2FStatListenCommitQueue.java;h=07e167d1e481aa427e53ba869182e10472fc28df;hp=e336f01874e25f48b2a13d1221b06147bc7c9df0;hb=227a056ef3f8d9dee9414948ea261b0d2348a1f1;hpb=462f9ceb7da71750eead2a15f47e6229980954ad diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitQueue.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitQueue.java index e336f01874..07e167d1e4 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitQueue.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitQueue.java @@ -8,8 +8,11 @@ package org.opendaylight.controller.md.statistics.manager.impl; -import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; @@ -42,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; /** * statistics-manager @@ -84,65 +88,120 @@ public class StatListenCommitQueue extends StatAbstractListenCommit queueStats = notification.getQueueIdAndStatisticsMap() != null - ? new ArrayList<>(notification.getQueueIdAndStatisticsMap()) : new ArrayList(10); - final Optional> txContainer = getTransactionCacheContainer(transId, nodeId); - if (txContainer.isPresent()) { - final List cachedNotifs = - txContainer.get().getNotifications(); - for (final TransactionAware notif : cachedNotifs) { - if (notif instanceof QueueStatisticsUpdate) { - queueStats.addAll(((QueueStatisticsUpdate) notif).getQueueIdAndStatisticsMap()); - } - } - } - final InstanceIdentifier nodeIdent = InstanceIdentifier.create(Nodes.class) - .child(Node.class, new NodeKey(nodeId)); - /* Queue statistics are small size and we are not able to change for OF cross controller - * - don't need to make are atomic */ + + /* Don't block RPC Notification thread */ manager.enqueue(new StatDataStoreOperation() { @Override - public void applyOperation(final ReadWriteTransaction trans) { - /* Notification for continue */ + public void applyOperation(final ReadWriteTransaction tx) { + + final InstanceIdentifier nodeIdent = InstanceIdentifier.create(Nodes.class) + .child(Node.class, new NodeKey(nodeId)); + + /* Validate exist Node */ + Optional fNode = Optional.absent(); + try { + fNode = tx.read(LogicalDatastoreType.OPERATIONAL, nodeIdent).checkedGet(); + } + catch (final ReadFailedException e) { + LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e); + } + if ( ! fNode.isPresent()) { + LOG.trace("Read Operational/DS for Node fail! Node {} doesn't exist.", nodeIdent); + return; + } + + /* Get and Validate TransactionCacheContainer */ + final Optional> txContainer = getTransactionCacheContainer(transId, nodeId); + if ( ! isTransactionCacheContainerValid(txContainer)) { + return; + } + /* Prepare List actual Queues and not updated Queues will be removed */ + final List existConnectors = fNode.get().getNodeConnector() != null + ? fNode.get().getNodeConnector() : Collections. emptyList(); + final Map existQueueKeys = new HashMap<>(); + for (final NodeConnector connect : existConnectors) { + final List listQueues = connect.getAugmentation(FlowCapableNodeConnector.class).getQueue(); + if (listQueues != null) { + for (final Queue queue : listQueues) { + existQueueKeys.put(queue.getKey(), connect.getKey()); + } + } + } + /* Queue processing */ + statQueueCommit(txContainer, tx, nodeIdent, existQueueKeys); + /* Delete all not presented Group Nodes */ + deleteAllNotPresentedNodes(nodeIdent, tx, Collections.unmodifiableMap(existQueueKeys)); + /* Notification for continue collecting statistics */ notifyToCollectNextStatistics(nodeIdent); - statQueueCommit(queueStats, nodeIdent, trans); } }); } - private void statQueueCommit(final List queueStats, - final InstanceIdentifier nodeIdent, final ReadWriteTransaction trans) { + private void statQueueCommit( + final Optional> txContainer, final ReadWriteTransaction tx, + final InstanceIdentifier nodeIdent, final Map existQueueKeys) { - /* check exist FlowCapableNode and write statistics */ - Optional fNode = Optional.absent(); - try { - fNode = trans.read(LogicalDatastoreType.OPERATIONAL, nodeIdent).checkedGet(); - } - catch (final ReadFailedException e) { - LOG.debug("Read Operational/DS for Node fail! {}", nodeIdent, e); - return; + Preconditions.checkNotNull(existQueueKeys); + Preconditions.checkNotNull(txContainer); + Preconditions.checkNotNull(nodeIdent); + Preconditions.checkNotNull(tx); + + final List cacheNotifs = txContainer.get().getNotifications(); + for (final TransactionAware notif : cacheNotifs) { + if ( ! (notif instanceof QueueStatisticsUpdate)) { + break; + } + final List queueStats = ((QueueStatisticsUpdate) notif).getQueueIdAndStatisticsMap(); + if (queueStats == null) { + break; + } + for (final QueueIdAndStatisticsMap queueStat : queueStats) { + if (queueStat.getQueueId() != null) { + final FlowCapableNodeConnectorQueueStatistics statChild = + new FlowCapableNodeConnectorQueueStatisticsBuilder(queueStat).build(); + final FlowCapableNodeConnectorQueueStatisticsDataBuilder statBuild = + new FlowCapableNodeConnectorQueueStatisticsDataBuilder(); + statBuild.setFlowCapableNodeConnectorQueueStatistics(statChild); + final QueueKey qKey = new QueueKey(queueStat.getQueueId()); + final InstanceIdentifier queueStatIdent = nodeIdent + .child(NodeConnector.class, new NodeConnectorKey(queueStat.getNodeConnectorId())) + .augmentation(FlowCapableNodeConnector.class) + .child(Queue.class, qKey).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class); + existQueueKeys.remove(qKey); + tx.put(LogicalDatastoreType.OPERATIONAL, queueStatIdent, statBuild.build()); + } + } } - if ( ! fNode.isPresent()) { - LOG.trace("Read Operational/DS for Node fail! Node {} doesn't exist.", nodeIdent); + } + + private void deleteAllNotPresentedNodes(final InstanceIdentifier nodeIdent, + final ReadWriteTransaction tx, final Map existQueueKeys) { + + Preconditions.checkNotNull(nodeIdent); + Preconditions.checkNotNull(tx); + + if (existQueueKeys == null) { return; } - for (final QueueIdAndStatisticsMap queueEntry : queueStats) { - final FlowCapableNodeConnectorQueueStatistics statChild = - new FlowCapableNodeConnectorQueueStatisticsBuilder(queueEntry).build(); - final FlowCapableNodeConnectorQueueStatisticsDataBuilder statBuild = - new FlowCapableNodeConnectorQueueStatisticsDataBuilder(); - statBuild.setFlowCapableNodeConnectorQueueStatistics(statChild); - final QueueKey qKey = new QueueKey(queueEntry.getQueueId()); - final InstanceIdentifier queueStatIdent = nodeIdent - .child(NodeConnector.class, new NodeConnectorKey(queueEntry.getNodeConnectorId())) - .augmentation(FlowCapableNodeConnector.class) - .child(Queue.class, qKey).augmentation(FlowCapableNodeConnectorQueueStatisticsData.class); - trans.put(LogicalDatastoreType.OPERATIONAL, queueStatIdent, statBuild.build()); + for (final Entry entry : existQueueKeys.entrySet()) { + final InstanceIdentifier queueIdent = nodeIdent.child(NodeConnector.class, entry.getValue()) + .augmentation(FlowCapableNodeConnector.class).child(Queue.class, entry.getKey()); + LOG.trace("Queue {} has to removed.", queueIdent); + Optional delQueue = Optional.absent(); + try { + delQueue = tx.read(LogicalDatastoreType.OPERATIONAL, queueIdent).checkedGet(); + } + catch (final ReadFailedException e) { + // NOOP - probably another transaction delete that node + } + if (delQueue.isPresent()) { + tx.delete(LogicalDatastoreType.OPERATIONAL, queueIdent); + } } } }