X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2FStatisticsProvider.java;h=3ee059d1c0e06f45e7c14a832e088b3dfc906029;hb=6fa3aa30c3eb47ca13520f9b4090bd03bf4e191f;hp=7432db74eb175fc1e35f1736fe2ea5cd1ae0b2e4;hpb=38e4ef09e02d2e1ef4a1a3b1f813783f8a1b7295;p=controller.git diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java index 7432db74eb..3ee059d1c0 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java @@ -96,7 +96,6 @@ public class StatisticsProvider implements AutoCloseable { private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class); private final ConcurrentMap handlers = new ConcurrentHashMap<>(); - private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager(); private final Timer timer = new Timer("statistics-manager", true); private final DataProviderService dps; @@ -118,10 +117,6 @@ public class StatisticsProvider implements AutoCloseable { this.dps = Preconditions.checkNotNull(dataService); } - public MultipartMessageManager getMultipartMessageManager() { - return multipartMessageManager; - } - private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this); private Registration listenerRegistration; @@ -156,14 +151,15 @@ public class StatisticsProvider implements AutoCloseable { public void run() { try { // Send stats requests - statsRequestSender(); + for (NodeStatisticsHandler h : handlers.values()) { + sendStatisticsRequestsToNode(h); + } // Perform cleanup for(NodeStatisticsHandler nodeStatisticsAger : handlers.values()){ nodeStatisticsAger.cleanStaleStatistics(); } - multipartMessageManager.cleanStaleTransactionIds(); } catch (RuntimeException e) { spLogger.warn("Failed to request statistics", e); } @@ -209,12 +205,6 @@ public class StatisticsProvider implements AutoCloseable { return dps.beginTransaction(); } - private void statsRequestSender() { - for (NodeStatisticsHandler h : handlers.values()) { - sendStatisticsRequestsToNode(h); - } - } - private void sendStatisticsRequestsToNode(final NodeStatisticsHandler h) { NodeKey targetNode = h.getTargetNodeKey(); spLogger.debug("Send requests for statistics collection to node : {}", targetNode.getId()); @@ -235,11 +225,11 @@ public class StatisticsProvider implements AutoCloseable { } if(groupStatsService != null){ sendAllGroupStatisticsRequest(h); - sendGroupDescriptionRequest(h.getTargetNodeRef()); + sendGroupDescriptionRequest(h); } if(meterStatsService != null){ sendAllMeterStatisticsRequest(h); - sendMeterConfigStatisticsRequest(h.getTargetNodeRef()); + sendMeterConfigStatisticsRequest(h); } if(queueStatsService != null){ sendAllQueueStatsFromAllNodeConnector(h); @@ -259,9 +249,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = flowTableStatsService.getFlowTablesStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),response.get().getResult().getTransactionId() - , StatsRequestType.ALL_FLOW_TABLE); - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW_TABLE); } private void sendAllFlowsStatsFromAllTablesRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -273,24 +261,26 @@ public class StatisticsProvider implements AutoCloseable { Future> response = flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_FLOW); + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW); + } + public void sendFlowStatsFromTableRequest(NodeKey node, Flow flow) throws InterruptedException, ExecutionException { + final NodeStatisticsHandler h = getStatisticsHandler(node.getId()); + if (h != null) { + sendFlowStatsFromTableRequest(h, flow); + } } - public void sendFlowStatsFromTableRequest(NodeRef targetNode,Flow flow) throws InterruptedException, ExecutionException{ + private void sendFlowStatsFromTableRequest(NodeStatisticsHandler h, Flow flow) throws InterruptedException, ExecutionException{ final GetFlowStatisticsFromFlowTableInputBuilder input = - new GetFlowStatisticsFromFlowTableInputBuilder(); + new GetFlowStatisticsFromFlowTableInputBuilder(flow); - input.setNode(targetNode); - input.fieldsFrom(flow); + input.setNode(h.getTargetNodeRef()); Future> response = flowStatsService.getFlowStatisticsFromFlowTable(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_FLOW); - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW); } private void sendAggregateFlowsStatsFromAllTablesRequest(final NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -298,24 +288,22 @@ public class StatisticsProvider implements AutoCloseable { spLogger.debug("Node {} supports {} table(s)", h, tables.size()); for (TableKey key : h.getKnownTables()) { - sendAggregateFlowsStatsFromTableRequest(h.getTargetNodeKey(), key.getId().shortValue()); + sendAggregateFlowsStatsFromTableRequest(h, key.getId().shortValue()); } } - private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{ + private void sendAggregateFlowsStatsFromTableRequest(final NodeStatisticsHandler h, Short tableId) throws InterruptedException, ExecutionException{ - spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey); + spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId, h.getTargetNodeKey()); GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input = new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder(); - input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance())); + input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, h.getTargetNodeKey()).toInstance())); input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId)); Future> response = flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build()); - multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId); - this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId() - , StatsRequestType.AGGR_FLOW); + h.recordExpectedTableTransaction(response.get().getResult().getTransactionId(), tableId); } private void sendAllNodeConnectorsStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -326,9 +314,7 @@ public class StatisticsProvider implements AutoCloseable { Future> response = portStatsService.getAllNodeConnectorsStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_PORT); - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_PORT); } private void sendAllGroupStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -340,22 +326,25 @@ public class StatisticsProvider implements AutoCloseable { Future> response = groupStatsService.getAllGroupStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_GROUP); + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_GROUP); + } + public void sendGroupDescriptionRequest(NodeKey node) throws InterruptedException, ExecutionException{ + final NodeStatisticsHandler h = getStatisticsHandler(node.getId()); + if (h != null) { + sendGroupDescriptionRequest(h); + } } - public void sendGroupDescriptionRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + private void sendGroupDescriptionRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder(); - input.setNode(targetNode); + input.setNode(h.getTargetNodeRef()); Future> response = groupStatsService.getGroupDescription(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() - , StatsRequestType.GROUP_DESC); - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC); } private void sendAllMeterStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ @@ -367,23 +356,26 @@ public class StatisticsProvider implements AutoCloseable { Future> response = meterStatsService.getAllMeterStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_METER);; + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_METER); + } + public void sendMeterConfigStatisticsRequest(NodeKey node) throws InterruptedException, ExecutionException { + final NodeStatisticsHandler h = getStatisticsHandler(node.getId()); + if (h != null) { + sendMeterConfigStatisticsRequest(h); + } } - public void sendMeterConfigStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{ + private void sendMeterConfigStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{ GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder(); - input.setNode(targetNode); + input.setNode(h.getTargetNodeRef()); Future> response = meterStatsService.getAllMeterConfigStatistics(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() - , StatsRequestType.METER_CONFIG);; - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);; } private void sendAllQueueStatsFromAllNodeConnector(NodeStatisticsHandler h) throws InterruptedException, ExecutionException { @@ -394,23 +386,26 @@ public class StatisticsProvider implements AutoCloseable { Future> response = queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_QUEUE_STATS);; + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);; + } + public void sendQueueStatsFromGivenNodeConnector(NodeKey node,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException { + final NodeStatisticsHandler h = getStatisticsHandler(node.getId()); + if (h != null) { + sendQueueStatsFromGivenNodeConnector(h, nodeConnectorId, queueId); + } } - public void sendQueueStatsFromGivenNodeConnector(NodeRef targetNode,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException { + private void sendQueueStatsFromGivenNodeConnector(NodeStatisticsHandler h, NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException { GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder(); - input.setNode(targetNode); + input.setNode(h.getTargetNodeRef()); input.setNodeConnectorId(nodeConnectorId); input.setQueueId(queueId); Future> response = queueStatsService.getQueueStatisticsFromGivenPort(input.build()); - this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId() - , StatsRequestType.ALL_QUEUE_STATS);; - + h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);; } /** @@ -429,13 +424,6 @@ public class StatisticsProvider implements AutoCloseable { return handler; } - @SuppressWarnings("unchecked") - private NodeId getNodeId(NodeRef nodeRef){ - InstanceIdentifier nodeII = (InstanceIdentifier) nodeRef.getValue(); - NodeKey nodeKey = InstanceIdentifier.keyOf(nodeII); - return nodeKey.getId(); - } - @Override public void close() { try {