From 6fa3aa30c3eb47ca13520f9b4090bd03bf4e191f Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 13 Feb 2014 23:37:20 +0100 Subject: [PATCH] Move MultipartMessageManager into NodeStatisticsHandler There is no reason to have a global manager, now each NodeStatisticsHandler instantiates its own manager -- thus the nodeId in its tables has no point. This has the added value of being protected by the NodeStatisticsHandler lock, thus we are prepared to having a completely consistent view of what operations are really expected even when switches flap. Change-Id: I4ca63982506c6f290967040d5626400cb0a0996e Signed-off-by: Robert Varga --- .../manager/MultipartMessageManager.java | 66 ++++++++--------- .../manager/NodeStatisticsHandler.java | 72 +++++++++++++------ .../manager/StatisticsListener.java | 61 +++------------- .../manager/StatisticsProvider.java | 56 +++++---------- 4 files changed, 106 insertions(+), 149 deletions(-) diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java index 2201cb3930..36062805c8 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java @@ -12,8 +12,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; + +import com.google.common.base.Preconditions; /** * Main responsibility of the class is to manage multipart response @@ -23,6 +25,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; * */ public class MultipartMessageManager { + private static final int NUMBER_OF_WAIT_CYCLES = 2; /* * Map for tx id and type of request, to keep track of all the request sent @@ -36,24 +39,17 @@ public class MultipartMessageManager { */ private final Map txIdTotableIdMap = new ConcurrentHashMap<>(); - private static final int NUMBER_OF_WAIT_CYCLES =2; - private static final class TxIdEntry { - private final TransactionId txId; - private final NodeId nodeId; private final StatsRequestType requestType; + private final TransactionId txId; - public TxIdEntry(NodeId nodeId, TransactionId txId, StatsRequestType requestType){ + public TxIdEntry(TransactionId txId, StatsRequestType requestType){ this.txId = txId; - this.nodeId = nodeId; this.requestType = requestType; } public TransactionId getTxId() { return txId; } - public NodeId getNodeId() { - return nodeId; - } public StatsRequestType getRequestType() { return requestType; } @@ -61,7 +57,6 @@ public class MultipartMessageManager { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); result = prime * result + ((txId == null) ? 0 : txId.hashCode()); return result; } @@ -78,13 +73,6 @@ public class MultipartMessageManager { } TxIdEntry other = (TxIdEntry) obj; - if (nodeId == null) { - if (other.nodeId != null) { - return false; - } - } else if (!nodeId.equals(other.nodeId)) { - return false; - } if (txId == null) { if (other.txId != null) { return false; @@ -97,36 +85,42 @@ public class MultipartMessageManager { @Override public String toString() { - return "TxIdEntry [txId=" + txId + ", nodeId=" + nodeId + ", requestType=" + requestType + "]"; + return "TxIdEntry [txId=" + txId + ", requestType=" + requestType + "]"; } } - public Short getTableIdForTxId(NodeId nodeId,TransactionId id){ - return txIdTotableIdMap.get(new TxIdEntry(nodeId,id,null)); + public void recordExpectedTableTransaction(TransactionId id, StatsRequestType type, Short tableId) { + recordExpectedTransaction(id, type); + txIdTotableIdMap.put(new TxIdEntry(id, null), Preconditions.checkNotNull(tableId)); } - public void setTxIdAndTableIdMapEntry(NodeId nodeId, TransactionId id,Short tableId){ - if(id == null) - return; - txIdTotableIdMap.put(new TxIdEntry(nodeId,id,null), tableId); - } + public Short isExpectedTableTransaction(TransactionAware transaction, Boolean more) { + if (!isExpectedTransaction(transaction, more)) { + return null; + } - public boolean isRequestTxIdExist(NodeId nodeId, TransactionId id, Boolean moreRepliesToFollow){ - TxIdEntry entry = new TxIdEntry(nodeId,id,null); - if(moreRepliesToFollow.booleanValue()){ - return txIdToRequestTypeMap.containsKey(entry); - }else{ - return txIdToRequestTypeMap.remove(entry) != null; + final TxIdEntry key = new TxIdEntry(transaction.getTransactionId(), null); + if (more != null && more.booleanValue()) { + return txIdTotableIdMap.get(key); + } else { + return txIdTotableIdMap.remove(key); } } - public void addTxIdToRequestTypeEntry (NodeId nodeId, TransactionId id,StatsRequestType type){ - if(id == null) - return; - TxIdEntry entry = new TxIdEntry(nodeId,id,type); + public void recordExpectedTransaction(TransactionId id, StatsRequestType type) { + TxIdEntry entry = new TxIdEntry(Preconditions.checkNotNull(id), Preconditions.checkNotNull(type)); txIdToRequestTypeMap.put(entry, getExpiryTime()); } + public boolean isExpectedTransaction(TransactionAware transaction, Boolean more) { + TxIdEntry entry = new TxIdEntry(transaction.getTransactionId(), null); + if (more != null && more.booleanValue()) { + return txIdToRequestTypeMap.containsKey(entry); + } else { + return txIdToRequestTypeMap.remove(entry) != null; + } + } + private static Long getExpiryTime(){ return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos( StatisticsProvider.STATS_COLLECTION_MILLIS*NUMBER_OF_WAIT_CYCLES); diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java index 17f1ce2a7b..45788b3319 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java @@ -11,6 +11,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; @@ -22,6 +23,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.A import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder; @@ -59,6 +62,7 @@ public final class NodeStatisticsHandler implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class); private static final int NUMBER_OF_WAIT_CYCLES = 2; + private final MultipartMessageManager msgManager = new MultipartMessageManager(); private final InstanceIdentifier targetNodeIdentifier; private final FlowStatsTracker flowStats; private final FlowTableStatsTracker flowTableStats; @@ -105,38 +109,52 @@ public final class NodeStatisticsHandler implements AutoCloseable { return targetNodeRef; } - public synchronized void updateGroupDescStats(List list) { - groupDescStats.updateStats(list); + public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List list) { + if (msgManager.isExpectedTransaction(transaction, more)) { + groupDescStats.updateStats(list); + } } - public synchronized void updateGroupStats(List list) { - groupStats.updateStats(list); + public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List list) { + if (msgManager.isExpectedTransaction(transaction, more)) { + groupStats.updateStats(list); + } } - public synchronized void updateMeterConfigStats(List list) { - meterConfigStats.updateStats(list); + public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List list) { + if (msgManager.isExpectedTransaction(transaction, more)) { + meterConfigStats.updateStats(list); + } } - public synchronized void updateMeterStats(List list) { - meterStats.updateStats(list); + public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List list) { + if (msgManager.isExpectedTransaction(transaction, more)) { + meterStats.updateStats(list); + } } - public synchronized void updateQueueStats(List list) { - queueStats.updateStats(list); + public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List list) { + if (msgManager.isExpectedTransaction(transaction, more)) { + queueStats.updateStats(list); + } } - public synchronized void updateFlowTableStats(List list) { - flowTableStats.updateStats(list); + public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List list) { + if (msgManager.isExpectedTransaction(transaction, more)) { + flowTableStats.updateStats(list); + } } - public synchronized void updateNodeConnectorStats(List list) { - nodeConnectorStats.updateStats(list); + public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List list) { + if (msgManager.isExpectedTransaction(transaction, more)) { + nodeConnectorStats.updateStats(list); + } } - public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) { + public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) { + final Short tableId = msgManager.isExpectedTableTransaction(transaction, more); if (tableId != null) { final DataModificationTransaction trans = dps.beginTransaction(); - InstanceIdentifier tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey) .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance(); @@ -153,11 +171,16 @@ public final class NodeStatisticsHandler implements AutoCloseable { tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build()); trans.putOperationalData(tableRef, tableBuilder.build()); - // FIXME: should we be tracking this data? trans.commit(); } } + public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List list) { + if (msgManager.isExpectedTransaction(transaction, more)) { + flowStats.updateStats(list); + } + } + public synchronized void updateGroupFeatures(GroupFeatures notification) { final DataModificationTransaction trans = dps.beginTransaction(); @@ -194,10 +217,6 @@ public final class NodeStatisticsHandler implements AutoCloseable { trans.commit(); } - public synchronized void updateFlowStats(List list) { - flowStats.updateStats(list); - } - public synchronized void cleanStaleStatistics() { final DataModificationTransaction trans = dps.beginTransaction(); final long now = System.nanoTime(); @@ -209,6 +228,7 @@ public final class NodeStatisticsHandler implements AutoCloseable { meterStats.cleanup(trans, now); nodeConnectorStats.cleanup(trans, now); queueStats.cleanup(trans, now); + msgManager.cleanStaleTransactionIds(); trans.commit(); } @@ -218,4 +238,14 @@ public final class NodeStatisticsHandler implements AutoCloseable { // FIXME: cleanup any resources we hold (registrations, etc.) logger.debug("Statistics handler for {} shut down", targetNodeKey.getId()); } + + // FIXME: this should be private + public synchronized void recordExpectedTransaction(TransactionId transactionId, StatsRequestType reqType) { + msgManager.recordExpectedTransaction(transactionId, reqType); + } + + // FIXME: this should be private + public synchronized void recordExpectedTableTransaction(TransactionId transactionId, Short tableId) { + msgManager.recordExpectedTableTransaction(transactionId, StatsRequestType.AGGR_FLOW, tableId); + } } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsListener.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsListener.java index 155815dc88..bd9f96c875 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsListener.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsListener.java @@ -44,7 +44,6 @@ public class StatisticsListener implements OpendaylightGroupStatisticsListener, private final static Logger sucLogger = LoggerFactory.getLogger(StatisticsListener.class); private final StatisticsProvider statisticsManager; - private final MultipartMessageManager messageManager; /** * default ctor @@ -52,56 +51,37 @@ public class StatisticsListener implements OpendaylightGroupStatisticsListener, */ public StatisticsListener(final StatisticsProvider manager){ this.statisticsManager = manager; - this.messageManager = this.statisticsManager.getMultipartMessageManager(); } @Override public void onMeterConfigStatsUpdated(final MeterConfigStatsUpdated notification) { - //Check if response is for the request statistics-manager sent. - if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) - return; - - //Add statistics to local cache final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId()); if (handler != null) { - handler.updateMeterConfigStats(notification.getMeterConfigStats()); + handler.updateMeterConfigStats(notification, notification.isMoreReplies(), notification.getMeterConfigStats()); } } @Override public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) { - //Check if response is for the request statistics-manager sent. - if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) - return; - - //Add statistics to local cache final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId()); if (handler != null) { - handler.updateMeterStats(notification.getMeterStats()); + handler.updateMeterStats(notification, notification.isMoreReplies(), notification.getMeterStats()); } } @Override public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) { - //Check if response is for the request statistics-manager sent. - if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) - return; - final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId()); if (handler != null) { - handler.updateGroupDescStats(notification.getGroupDescStats()); + handler.updateGroupDescStats(notification, notification.isMoreReplies(), notification.getGroupDescStats()); } } @Override public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) { - //Check if response is for the request statistics-manager sent. - if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) - return; - final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId()); if (handler != null) { - handler.updateGroupStats(notification.getGroupStats()); + handler.updateGroupStats(notification, notification.isMoreReplies(), notification.getGroupStats()); } } @@ -123,65 +103,42 @@ public class StatisticsListener implements OpendaylightGroupStatisticsListener, @Override public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) { - //Check if response is for the request statistics-manager sent. - if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) - return; - sucLogger.debug("Received flow stats update : {}",notification.toString()); final NodeStatisticsHandler sna = this.statisticsManager.getStatisticsHandler(notification.getId()); if (sna != null) { - sna.updateFlowStats(notification.getFlowAndStatisticsMapList()); + sna.updateFlowStats(notification, notification.isMoreReplies(), notification.getFlowAndStatisticsMapList()); } } @Override public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) { - //Check if response is for the request statistics-manager sent. - if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) - return; - final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId()); if (handler != null) { - final Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId()); - handler.updateAggregateFlowStats(tableId, notification); + handler.updateAggregateFlowStats(notification, notification.isMoreReplies(), notification); } } @Override public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) { - //Check if response is for the request statistics-manager sent. - if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) - return; - final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId()); if (handler != null) { - handler.updateNodeConnectorStats(notification.getNodeConnectorStatisticsAndPortNumberMap()); + handler.updateNodeConnectorStats(notification, notification.isMoreReplies(), notification.getNodeConnectorStatisticsAndPortNumberMap()); } } @Override public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) { - //Check if response is for the request statistics-manager sent. - if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) - return; - final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId()); if (handler != null) { - handler.updateFlowTableStats(notification.getFlowTableAndStatisticsMap()); + handler.updateFlowTableStats(notification, notification.isMoreReplies(), notification.getFlowTableAndStatisticsMap()); } } @Override public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) { - //Check if response is for the request statistics-manager sent. - if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies())) - return; - - //Add statistics to local cache final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId()); if (handler != null) { - handler.updateQueueStats(notification.getQueueIdAndStatisticsMap()); + handler.updateQueueStats(notification, notification.isMoreReplies(), notification.getQueueIdAndStatisticsMap()); } } } - diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java index e9d2356cf9..3ee059d1c0 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java @@ -96,7 +96,6 @@ public class StatisticsProvider implements AutoCloseable { private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class); private final ConcurrentMap handlers = new ConcurrentHashMap<>(); - private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager(); private final Timer timer = new Timer("statistics-manager", true); private final DataProviderService dps; @@ -118,10 +117,6 @@ public class StatisticsProvider implements AutoCloseable { this.dps = Preconditions.checkNotNull(dataService); } - public MultipartMessageManager getMultipartMessageManager() { - return multipartMessageManager; - } - private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this); private Registration listenerRegistration; @@ -165,7 +160,6 @@ public class StatisticsProvider implements AutoCloseable { nodeStatisticsAger.cleanStaleStatistics(); } - multipartMessageManager.cleanStaleTransactionIds(); } catch (RuntimeException e) { spLogger.warn("Failed to request statistics", e); } @@ -255,9 +249,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = flowTableStatsService.getFlowTablesStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),response.get().getResult().getTransactionId() - , StatsRequestType.ALL_FLOW_TABLE); - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW_TABLE); } private void sendAllFlowsStatsFromAllTablesRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -269,9 +261,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_FLOW); - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW); } public void sendFlowStatsFromTableRequest(NodeKey node, Flow flow) throws InterruptedException, ExecutionException { @@ -283,16 +273,14 @@ public class StatisticsProvider implements AutoCloseable { private void sendFlowStatsFromTableRequest(NodeStatisticsHandler h, Flow flow) throws InterruptedException, ExecutionException{ final GetFlowStatisticsFromFlowTableInputBuilder input = - new GetFlowStatisticsFromFlowTableInputBuilder(); + new GetFlowStatisticsFromFlowTableInputBuilder(flow); input.setNode(h.getTargetNodeRef()); - input.fieldsFrom(flow); Future> response = flowStatsService.getFlowStatisticsFromFlowTable(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), - response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW); + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW); } private void sendAggregateFlowsStatsFromAllTablesRequest(final NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -300,24 +288,22 @@ public class StatisticsProvider implements AutoCloseable { spLogger.debug("Node {} supports {} table(s)", h, tables.size()); for (TableKey key : h.getKnownTables()) { - sendAggregateFlowsStatsFromTableRequest(h.getTargetNodeKey(), key.getId().shortValue()); + sendAggregateFlowsStatsFromTableRequest(h, key.getId().shortValue()); } } - private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{ + private void sendAggregateFlowsStatsFromTableRequest(final NodeStatisticsHandler h, Short tableId) throws InterruptedException, ExecutionException{ - spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey); + spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId, h.getTargetNodeKey()); GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); - input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance())); + input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, h.getTargetNodeKey()).toInstance())); input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId)); Future> response = flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build()); - multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId); - this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId() - , StatsRequestType.AGGR_FLOW); + h.recordExpectedTableTransaction(response.get().getResult().getTransactionId(), tableId); } private void sendAllNodeConnectorsStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -328,9 +314,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = portStatsService.getAllNodeConnectorsStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_PORT); - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_PORT); } private void sendAllGroupStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -342,9 +326,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = groupStatsService.getAllGroupStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_GROUP); - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_GROUP); } public void sendGroupDescriptionRequest(NodeKey node) throws InterruptedException, ExecutionException{ @@ -362,8 +344,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = groupStatsService.getGroupDescription(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), - response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC); + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC); } private void sendAllMeterStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -375,9 +356,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = meterStatsService.getAllMeterStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_METER);; - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_METER); } public void sendMeterConfigStatisticsRequest(NodeKey node) throws InterruptedException, ExecutionException { @@ -396,8 +375,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = meterStatsService.getAllMeterConfigStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), - response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);; + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);; } private void sendAllQueueStatsFromAllNodeConnector(NodeStatisticsHandler h) throws InterruptedException, ExecutionException { @@ -408,8 +386,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), - response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);; + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);; } public void sendQueueStatsFromGivenNodeConnector(NodeKey node,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException { @@ -428,8 +405,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = queueStatsService.getQueueStatisticsFromGivenPort(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), - response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);; + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);; } /** -- 2.36.6