import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+import com.google.common.base.Preconditions;
/**
* Main responsibility of the class is to manage multipart response
*
*/
public class MultipartMessageManager {
+ private static final int NUMBER_OF_WAIT_CYCLES = 2;
/*
* Map for tx id and type of request, to keep track of all the request sent
*/
private final Map<TxIdEntry,Short> txIdTotableIdMap = new ConcurrentHashMap<>();
- private static final int NUMBER_OF_WAIT_CYCLES =2;
-
private static final class TxIdEntry {
- private final TransactionId txId;
- private final NodeId nodeId;
private final StatsRequestType requestType;
+ private final TransactionId txId;
- public TxIdEntry(NodeId nodeId, TransactionId txId, StatsRequestType requestType){
+ public TxIdEntry(TransactionId txId, StatsRequestType requestType){
this.txId = txId;
- this.nodeId = nodeId;
this.requestType = requestType;
}
public TransactionId getTxId() {
return txId;
}
- public NodeId getNodeId() {
- return nodeId;
- }
public StatsRequestType getRequestType() {
return requestType;
}
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
result = prime * result + ((txId == null) ? 0 : txId.hashCode());
return result;
}
}
TxIdEntry other = (TxIdEntry) obj;
- if (nodeId == null) {
- if (other.nodeId != null) {
- return false;
- }
- } else if (!nodeId.equals(other.nodeId)) {
- return false;
- }
if (txId == null) {
if (other.txId != null) {
return false;
@Override
public String toString() {
- return "TxIdEntry [txId=" + txId + ", nodeId=" + nodeId + ", requestType=" + requestType + "]";
+ return "TxIdEntry [txId=" + txId + ", requestType=" + requestType + "]";
}
}
- public Short getTableIdForTxId(NodeId nodeId,TransactionId id){
- return txIdTotableIdMap.get(new TxIdEntry(nodeId,id,null));
+ public void recordExpectedTableTransaction(TransactionId id, StatsRequestType type, Short tableId) {
+ recordExpectedTransaction(id, type);
+ txIdTotableIdMap.put(new TxIdEntry(id, null), Preconditions.checkNotNull(tableId));
}
- public void setTxIdAndTableIdMapEntry(NodeId nodeId, TransactionId id,Short tableId){
- if(id == null)
- return;
- txIdTotableIdMap.put(new TxIdEntry(nodeId,id,null), tableId);
- }
+ public Short isExpectedTableTransaction(TransactionAware transaction, Boolean more) {
+ if (!isExpectedTransaction(transaction, more)) {
+ return null;
+ }
- public boolean isRequestTxIdExist(NodeId nodeId, TransactionId id, Boolean moreRepliesToFollow){
- TxIdEntry entry = new TxIdEntry(nodeId,id,null);
- if(moreRepliesToFollow.booleanValue()){
- return txIdToRequestTypeMap.containsKey(entry);
- }else{
- return txIdToRequestTypeMap.remove(entry) != null;
+ final TxIdEntry key = new TxIdEntry(transaction.getTransactionId(), null);
+ if (more != null && more.booleanValue()) {
+ return txIdTotableIdMap.get(key);
+ } else {
+ return txIdTotableIdMap.remove(key);
}
}
- public void addTxIdToRequestTypeEntry (NodeId nodeId, TransactionId id,StatsRequestType type){
- if(id == null)
- return;
- TxIdEntry entry = new TxIdEntry(nodeId,id,type);
+ public void recordExpectedTransaction(TransactionId id, StatsRequestType type) {
+ TxIdEntry entry = new TxIdEntry(Preconditions.checkNotNull(id), Preconditions.checkNotNull(type));
txIdToRequestTypeMap.put(entry, getExpiryTime());
}
+ public boolean isExpectedTransaction(TransactionAware transaction, Boolean more) {
+ TxIdEntry entry = new TxIdEntry(transaction.getTransactionId(), null);
+ if (more != null && more.booleanValue()) {
+ return txIdToRequestTypeMap.containsKey(entry);
+ } else {
+ return txIdToRequestTypeMap.remove(entry) != null;
+ }
+ }
+
private static Long getExpiryTime(){
return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(
StatisticsProvider.STATS_COLLECTION_MILLIS*NUMBER_OF_WAIT_CYCLES);
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
private static final int NUMBER_OF_WAIT_CYCLES = 2;
+ private final MultipartMessageManager msgManager = new MultipartMessageManager();
private final InstanceIdentifier<Node> targetNodeIdentifier;
private final FlowStatsTracker flowStats;
private final FlowTableStatsTracker flowTableStats;
return targetNodeRef;
}
- public synchronized void updateGroupDescStats(List<GroupDescStats> list) {
- groupDescStats.updateStats(list);
+ public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ groupDescStats.updateStats(list);
+ }
}
- public synchronized void updateGroupStats(List<GroupStats> list) {
- groupStats.updateStats(list);
+ public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ groupStats.updateStats(list);
+ }
}
- public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
- meterConfigStats.updateStats(list);
+ public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ meterConfigStats.updateStats(list);
+ }
}
- public synchronized void updateMeterStats(List<MeterStats> list) {
- meterStats.updateStats(list);
+ public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ meterStats.updateStats(list);
+ }
}
- public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
- queueStats.updateStats(list);
+ public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ queueStats.updateStats(list);
+ }
}
- public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
- flowTableStats.updateStats(list);
+ public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ flowTableStats.updateStats(list);
+ }
}
- public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
- nodeConnectorStats.updateStats(list);
+ public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ nodeConnectorStats.updateStats(list);
+ }
}
- public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
+ public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
+ final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
if (tableId != null) {
final DataModificationTransaction trans = dps.beginTransaction();
-
InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
.augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
trans.putOperationalData(tableRef, tableBuilder.build());
- // FIXME: should we be tracking this data?
trans.commit();
}
}
+ public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
+ if (msgManager.isExpectedTransaction(transaction, more)) {
+ flowStats.updateStats(list);
+ }
+ }
+
public synchronized void updateGroupFeatures(GroupFeatures notification) {
final DataModificationTransaction trans = dps.beginTransaction();
trans.commit();
}
- public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
- flowStats.updateStats(list);
- }
-
public synchronized void cleanStaleStatistics() {
final DataModificationTransaction trans = dps.beginTransaction();
final long now = System.nanoTime();
meterStats.cleanup(trans, now);
nodeConnectorStats.cleanup(trans, now);
queueStats.cleanup(trans, now);
+ msgManager.cleanStaleTransactionIds();
trans.commit();
}
// FIXME: cleanup any resources we hold (registrations, etc.)
logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
}
+
+ // FIXME: this should be private
+ public synchronized void recordExpectedTransaction(TransactionId transactionId, StatsRequestType reqType) {
+ msgManager.recordExpectedTransaction(transactionId, reqType);
+ }
+
+ // FIXME: this should be private
+ public synchronized void recordExpectedTableTransaction(TransactionId transactionId, Short tableId) {
+ msgManager.recordExpectedTableTransaction(transactionId, StatsRequestType.AGGR_FLOW, tableId);
+ }
}
private final static Logger sucLogger = LoggerFactory.getLogger(StatisticsListener.class);
private final StatisticsProvider statisticsManager;
- private final MultipartMessageManager messageManager;
/**
* default ctor
*/
public StatisticsListener(final StatisticsProvider manager){
this.statisticsManager = manager;
- this.messageManager = this.statisticsManager.getMultipartMessageManager();
}
@Override
public void onMeterConfigStatsUpdated(final MeterConfigStatsUpdated notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- //Add statistics to local cache
final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
if (handler != null) {
- handler.updateMeterConfigStats(notification.getMeterConfigStats());
+ handler.updateMeterConfigStats(notification, notification.isMoreReplies(), notification.getMeterConfigStats());
}
}
@Override
public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- //Add statistics to local cache
final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
if (handler != null) {
- handler.updateMeterStats(notification.getMeterStats());
+ handler.updateMeterStats(notification, notification.isMoreReplies(), notification.getMeterStats());
}
}
@Override
public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId());
if (handler != null) {
- handler.updateGroupDescStats(notification.getGroupDescStats());
+ handler.updateGroupDescStats(notification, notification.isMoreReplies(), notification.getGroupDescStats());
}
}
@Override
public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId());
if (handler != null) {
- handler.updateGroupStats(notification.getGroupStats());
+ handler.updateGroupStats(notification, notification.isMoreReplies(), notification.getGroupStats());
}
}
@Override
public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
sucLogger.debug("Received flow stats update : {}",notification.toString());
final NodeStatisticsHandler sna = this.statisticsManager.getStatisticsHandler(notification.getId());
if (sna != null) {
- sna.updateFlowStats(notification.getFlowAndStatisticsMapList());
+ sna.updateFlowStats(notification, notification.isMoreReplies(), notification.getFlowAndStatisticsMapList());
}
}
@Override
public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
if (handler != null) {
- final Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId());
- handler.updateAggregateFlowStats(tableId, notification);
+ handler.updateAggregateFlowStats(notification, notification.isMoreReplies(), notification);
}
}
@Override
public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
if (handler != null) {
- handler.updateNodeConnectorStats(notification.getNodeConnectorStatisticsAndPortNumberMap());
+ handler.updateNodeConnectorStats(notification, notification.isMoreReplies(), notification.getNodeConnectorStatisticsAndPortNumberMap());
}
}
@Override
public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
if (handler != null) {
- handler.updateFlowTableStats(notification.getFlowTableAndStatisticsMap());
+ handler.updateFlowTableStats(notification, notification.isMoreReplies(), notification.getFlowTableAndStatisticsMap());
}
}
@Override
public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
- //Check if response is for the request statistics-manager sent.
- if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
- return;
-
- //Add statistics to local cache
final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
if (handler != null) {
- handler.updateQueueStats(notification.getQueueIdAndStatisticsMap());
+ handler.updateQueueStats(notification, notification.isMoreReplies(), notification.getQueueIdAndStatisticsMap());
}
}
}
-
private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
- private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
private final Timer timer = new Timer("statistics-manager", true);
private final DataProviderService dps;
this.dps = Preconditions.checkNotNull(dataService);
}
- public MultipartMessageManager getMultipartMessageManager() {
- return multipartMessageManager;
- }
-
private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
private Registration<NotificationListener> listenerRegistration;
nodeStatisticsAger.cleanStaleStatistics();
}
- multipartMessageManager.cleanStaleTransactionIds();
} catch (RuntimeException e) {
spLogger.warn("Failed to request statistics", e);
}
Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
flowTableStatsService.getFlowTablesStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),response.get().getResult().getTransactionId()
- , StatsRequestType.ALL_FLOW_TABLE);
-
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW_TABLE);
}
private void sendAllFlowsStatsFromAllTablesRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
- , StatsRequestType.ALL_FLOW);
-
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
}
public void sendFlowStatsFromTableRequest(NodeKey node, Flow flow) throws InterruptedException, ExecutionException {
private void sendFlowStatsFromTableRequest(NodeStatisticsHandler h, Flow flow) throws InterruptedException, ExecutionException{
final GetFlowStatisticsFromFlowTableInputBuilder input =
- new GetFlowStatisticsFromFlowTableInputBuilder();
+ new GetFlowStatisticsFromFlowTableInputBuilder(flow);
input.setNode(h.getTargetNodeRef());
- input.fieldsFrom(flow);
Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
flowStatsService.getFlowStatisticsFromFlowTable(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
- response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
}
private void sendAggregateFlowsStatsFromAllTablesRequest(final NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
spLogger.debug("Node {} supports {} table(s)", h, tables.size());
for (TableKey key : h.getKnownTables()) {
- sendAggregateFlowsStatsFromTableRequest(h.getTargetNodeKey(), key.getId().shortValue());
+ sendAggregateFlowsStatsFromTableRequest(h, key.getId().shortValue());
}
}
- private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
+ private void sendAggregateFlowsStatsFromTableRequest(final NodeStatisticsHandler h, Short tableId) throws InterruptedException, ExecutionException{
- spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
+ spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId, h.getTargetNodeKey());
GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
- input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
+ input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, h.getTargetNodeKey()).toInstance()));
input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
- multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
- this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
- , StatsRequestType.AGGR_FLOW);
+ h.recordExpectedTableTransaction(response.get().getResult().getTransactionId(), tableId);
}
private void sendAllNodeConnectorsStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
portStatsService.getAllNodeConnectorsStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
- , StatsRequestType.ALL_PORT);
-
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_PORT);
}
private void sendAllGroupStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
Future<RpcResult<GetAllGroupStatisticsOutput>> response =
groupStatsService.getAllGroupStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
- , StatsRequestType.ALL_GROUP);
-
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_GROUP);
}
public void sendGroupDescriptionRequest(NodeKey node) throws InterruptedException, ExecutionException{
Future<RpcResult<GetGroupDescriptionOutput>> response =
groupStatsService.getGroupDescription(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
- response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC);
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC);
}
private void sendAllMeterStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
Future<RpcResult<GetAllMeterStatisticsOutput>> response =
meterStatsService.getAllMeterStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
- , StatsRequestType.ALL_METER);;
-
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_METER);
}
public void sendMeterConfigStatisticsRequest(NodeKey node) throws InterruptedException, ExecutionException {
Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
meterStatsService.getAllMeterConfigStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
- response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);;
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);;
}
private void sendAllQueueStatsFromAllNodeConnector(NodeStatisticsHandler h) throws InterruptedException, ExecutionException {
Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
- response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
}
public void sendQueueStatsFromGivenNodeConnector(NodeKey node,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
queueStatsService.getQueueStatisticsFromGivenPort(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
- response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
+ h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
}
/**