Move MultipartMessageManager into NodeStatisticsHandler
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / StatisticsProvider.java
index e9d2356cf9552f560b582747e6f3c66a59452cb3..3ee059d1c0e06f45e7c14a832e088b3dfc906029 100644 (file)
@@ -96,7 +96,6 @@ public class StatisticsProvider implements AutoCloseable {
     private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
 
     private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
-    private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
     private final Timer timer = new Timer("statistics-manager", true);
     private final DataProviderService dps;
 
@@ -118,10 +117,6 @@ public class StatisticsProvider implements AutoCloseable {
         this.dps = Preconditions.checkNotNull(dataService);
     }
 
-    public MultipartMessageManager getMultipartMessageManager() {
-        return multipartMessageManager;
-    }
-
     private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
 
     private Registration<NotificationListener> listenerRegistration;
@@ -165,7 +160,6 @@ public class StatisticsProvider implements AutoCloseable {
                         nodeStatisticsAger.cleanStaleStatistics();
                     }
 
-                    multipartMessageManager.cleanStaleTransactionIds();
                 } catch (RuntimeException e) {
                     spLogger.warn("Failed to request statistics", e);
                 }
@@ -255,9 +249,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
                 flowTableStatsService.getFlowTablesStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_FLOW_TABLE);
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW_TABLE);
     }
 
     private void sendAllFlowsStatsFromAllTablesRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -269,9 +261,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_FLOW);
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
     }
 
     public void sendFlowStatsFromTableRequest(NodeKey node, Flow flow) throws InterruptedException, ExecutionException {
@@ -283,16 +273,14 @@ public class StatisticsProvider implements AutoCloseable {
 
     private void sendFlowStatsFromTableRequest(NodeStatisticsHandler h, Flow flow) throws InterruptedException, ExecutionException{
         final GetFlowStatisticsFromFlowTableInputBuilder input =
-                new GetFlowStatisticsFromFlowTableInputBuilder();
+                new GetFlowStatisticsFromFlowTableInputBuilder(flow);
 
         input.setNode(h.getTargetNodeRef());
-        input.fieldsFrom(flow);
 
         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
     }
 
     private void sendAggregateFlowsStatsFromAllTablesRequest(final NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -300,24 +288,22 @@ public class StatisticsProvider implements AutoCloseable {
         spLogger.debug("Node {} supports {} table(s)", h, tables.size());
 
         for (TableKey key : h.getKnownTables()) {
-            sendAggregateFlowsStatsFromTableRequest(h.getTargetNodeKey(), key.getId().shortValue());
+            sendAggregateFlowsStatsFromTableRequest(h, key.getId().shortValue());
         }
     }
 
-    private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
+    private void sendAggregateFlowsStatsFromTableRequest(final NodeStatisticsHandler h, Short tableId) throws InterruptedException, ExecutionException{
 
-        spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
+        spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId, h.getTargetNodeKey());
         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
                 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
 
-        input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
+        input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, h.getTargetNodeKey()).toInstance()));
         input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
         Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
                 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
 
-        multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.AGGR_FLOW);
+        h.recordExpectedTableTransaction(response.get().getResult().getTransactionId(), tableId);
     }
 
     private void sendAllNodeConnectorsStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -328,9 +314,7 @@ public class StatisticsProvider implements AutoCloseable {
 
         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
                 portStatsService.getAllNodeConnectorsStatistics(input.build());
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_PORT);
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_PORT);
     }
 
     private void sendAllGroupStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -342,9 +326,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllGroupStatisticsOutput>> response =
                 groupStatsService.getAllGroupStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_GROUP);
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_GROUP);
     }
 
     public void sendGroupDescriptionRequest(NodeKey node) throws InterruptedException, ExecutionException{
@@ -362,8 +344,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetGroupDescriptionOutput>> response =
                 groupStatsService.getGroupDescription(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC);
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC);
     }
 
     private void sendAllMeterStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -375,9 +356,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllMeterStatisticsOutput>> response =
                 meterStatsService.getAllMeterStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_METER);;
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_METER);
     }
 
     public void sendMeterConfigStatisticsRequest(NodeKey node) throws InterruptedException, ExecutionException {
@@ -396,8 +375,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
                 meterStatsService.getAllMeterConfigStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);;
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);;
     }
 
     private void sendAllQueueStatsFromAllNodeConnector(NodeStatisticsHandler h) throws InterruptedException, ExecutionException {
@@ -408,8 +386,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
     }
 
     public void sendQueueStatsFromGivenNodeConnector(NodeKey node,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
@@ -428,8 +405,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
     }
 
     /**