Move MultipartMessageManager into NodeStatisticsHandler 21/5321/3
authorRobert Varga <rovarga@cisco.com>
Thu, 13 Feb 2014 22:37:20 +0000 (23:37 +0100)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 15 Feb 2014 02:10:16 +0000 (02:10 +0000)
There is no reason to have a global manager, now each
NodeStatisticsHandler instantiates its own manager -- thus the nodeId
in its tables has no point. This has the added value of being protected
by the NodeStatisticsHandler lock, thus we are prepared to having a
completely consistent view of what operations are really expected even
when switches flap.

Change-Id: I4ca63982506c6f290967040d5626400cb0a0996e
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/MultipartMessageManager.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/NodeStatisticsHandler.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsListener.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsProvider.java

index 2201cb3930e43427cdec6a1921a27f57538fdb04..36062805c875f90120a163304fd7af57b814862e 100644 (file)
@@ -12,8 +12,10 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Main responsibility of the class is to manage multipart response
@@ -23,6 +25,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
  *
  */
 public class MultipartMessageManager {
+    private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
     /*
      *  Map for tx id and type of request, to keep track of all the request sent
@@ -36,24 +39,17 @@ public class MultipartMessageManager {
      */
     private final Map<TxIdEntry,Short> txIdTotableIdMap = new ConcurrentHashMap<>();
 
-    private static final int NUMBER_OF_WAIT_CYCLES =2;
-
     private static final class TxIdEntry {
-        private final TransactionId txId;
-        private final NodeId nodeId;
         private final StatsRequestType requestType;
+        private final TransactionId txId;
 
-        public TxIdEntry(NodeId nodeId, TransactionId txId, StatsRequestType requestType){
+        public TxIdEntry(TransactionId txId, StatsRequestType requestType){
             this.txId = txId;
-            this.nodeId = nodeId;
             this.requestType = requestType;
         }
         public TransactionId getTxId() {
             return txId;
         }
-        public NodeId getNodeId() {
-            return nodeId;
-        }
         public StatsRequestType getRequestType() {
             return requestType;
         }
@@ -61,7 +57,6 @@ public class MultipartMessageManager {
         public int hashCode() {
             final int prime = 31;
             int result = 1;
-            result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
             result = prime * result + ((txId == null) ? 0 : txId.hashCode());
             return result;
         }
@@ -78,13 +73,6 @@ public class MultipartMessageManager {
             }
             TxIdEntry other = (TxIdEntry) obj;
 
-            if (nodeId == null) {
-                if (other.nodeId != null) {
-                    return false;
-                }
-            } else if (!nodeId.equals(other.nodeId)) {
-                return false;
-            }
             if (txId == null) {
                 if (other.txId != null) {
                     return false;
@@ -97,36 +85,42 @@ public class MultipartMessageManager {
 
         @Override
         public String toString() {
-            return "TxIdEntry [txId=" + txId + ", nodeId=" + nodeId + ", requestType=" + requestType + "]";
+            return "TxIdEntry [txId=" + txId + ", requestType=" + requestType + "]";
         }
     }
 
-    public Short getTableIdForTxId(NodeId nodeId,TransactionId id){
-        return txIdTotableIdMap.get(new TxIdEntry(nodeId,id,null));
+    public void recordExpectedTableTransaction(TransactionId id, StatsRequestType type, Short tableId) {
+        recordExpectedTransaction(id, type);
+        txIdTotableIdMap.put(new TxIdEntry(id, null), Preconditions.checkNotNull(tableId));
     }
 
-    public void setTxIdAndTableIdMapEntry(NodeId nodeId, TransactionId id,Short tableId){
-        if(id == null)
-            return;
-        txIdTotableIdMap.put(new TxIdEntry(nodeId,id,null), tableId);
-    }
+    public Short isExpectedTableTransaction(TransactionAware transaction, Boolean more) {
+        if (!isExpectedTransaction(transaction, more)) {
+            return null;
+        }
 
-    public boolean isRequestTxIdExist(NodeId nodeId, TransactionId id, Boolean moreRepliesToFollow){
-        TxIdEntry entry = new TxIdEntry(nodeId,id,null);
-        if(moreRepliesToFollow.booleanValue()){
-            return txIdToRequestTypeMap.containsKey(entry);
-        }else{
-            return txIdToRequestTypeMap.remove(entry) != null;
+        final TxIdEntry key = new TxIdEntry(transaction.getTransactionId(), null);
+        if (more != null && more.booleanValue()) {
+            return txIdTotableIdMap.get(key);
+        } else {
+            return txIdTotableIdMap.remove(key);
         }
     }
 
-    public void addTxIdToRequestTypeEntry (NodeId nodeId, TransactionId id,StatsRequestType type){
-        if(id == null)
-            return;
-        TxIdEntry entry = new TxIdEntry(nodeId,id,type);
+    public void recordExpectedTransaction(TransactionId id, StatsRequestType type) {
+        TxIdEntry entry = new TxIdEntry(Preconditions.checkNotNull(id), Preconditions.checkNotNull(type));
         txIdToRequestTypeMap.put(entry, getExpiryTime());
     }
 
+    public boolean isExpectedTransaction(TransactionAware transaction, Boolean more) {
+        TxIdEntry entry = new TxIdEntry(transaction.getTransactionId(), null);
+        if (more != null && more.booleanValue()) {
+            return txIdToRequestTypeMap.containsKey(entry);
+        } else {
+            return txIdToRequestTypeMap.remove(entry) != null;
+        }
+    }
+
     private static Long getExpiryTime(){
         return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(
                 StatisticsProvider.STATS_COLLECTION_MILLIS*NUMBER_OF_WAIT_CYCLES);
index 17f1ce2a7be482459d1fa78f6423f0bb952b4e92..45788b331954ceaad947977d8cc61ec9e4e4f25d 100644 (file)
@@ -11,6 +11,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
@@ -22,6 +23,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.A
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.group.features.GroupFeaturesBuilder;
@@ -59,6 +62,7 @@ public final class NodeStatisticsHandler implements AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
     private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
+    private final MultipartMessageManager msgManager = new MultipartMessageManager();
     private final InstanceIdentifier<Node> targetNodeIdentifier;
     private final FlowStatsTracker flowStats;
     private final FlowTableStatsTracker flowTableStats;
@@ -105,38 +109,52 @@ public final class NodeStatisticsHandler implements AutoCloseable {
         return targetNodeRef;
     }
 
-    public synchronized void updateGroupDescStats(List<GroupDescStats> list) {
-        groupDescStats.updateStats(list);
+    public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            groupDescStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateGroupStats(List<GroupStats> list) {
-        groupStats.updateStats(list);
+    public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            groupStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
-        meterConfigStats.updateStats(list);
+    public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            meterConfigStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateMeterStats(List<MeterStats> list) {
-        meterStats.updateStats(list);
+    public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            meterStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
-        queueStats.updateStats(list);
+    public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            queueStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
-        flowTableStats.updateStats(list);
+    public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            flowTableStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
-        nodeConnectorStats.updateStats(list);
+    public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            nodeConnectorStats.updateStats(list);
+        }
     }
 
-    public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
+    public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
+        final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
         if (tableId != null) {
             final DataModificationTransaction trans = dps.beginTransaction();
-
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
 
@@ -153,11 +171,16 @@ public final class NodeStatisticsHandler implements AutoCloseable {
             tableBuilder.addAugmentation(AggregateFlowStatisticsData.class, aggregateFlowStatisticsDataBuilder.build());
             trans.putOperationalData(tableRef, tableBuilder.build());
 
-            // FIXME: should we be tracking this data?
             trans.commit();
         }
     }
 
+    public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
+        if (msgManager.isExpectedTransaction(transaction, more)) {
+            flowStats.updateStats(list);
+        }
+    }
+
     public synchronized void updateGroupFeatures(GroupFeatures notification) {
         final DataModificationTransaction trans = dps.beginTransaction();
 
@@ -194,10 +217,6 @@ public final class NodeStatisticsHandler implements AutoCloseable {
         trans.commit();
     }
 
-    public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
-        flowStats.updateStats(list);
-    }
-
     public synchronized void cleanStaleStatistics() {
         final DataModificationTransaction trans = dps.beginTransaction();
         final long now = System.nanoTime();
@@ -209,6 +228,7 @@ public final class NodeStatisticsHandler implements AutoCloseable {
         meterStats.cleanup(trans, now);
         nodeConnectorStats.cleanup(trans, now);
         queueStats.cleanup(trans, now);
+        msgManager.cleanStaleTransactionIds();
 
         trans.commit();
     }
@@ -218,4 +238,14 @@ public final class NodeStatisticsHandler implements AutoCloseable {
         // FIXME: cleanup any resources we hold (registrations, etc.)
         logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
     }
+
+    // FIXME: this should be private
+    public synchronized void recordExpectedTransaction(TransactionId transactionId, StatsRequestType reqType) {
+        msgManager.recordExpectedTransaction(transactionId, reqType);
+    }
+
+    // FIXME: this should be private
+    public synchronized void recordExpectedTableTransaction(TransactionId transactionId, Short tableId) {
+        msgManager.recordExpectedTableTransaction(transactionId, StatsRequestType.AGGR_FLOW, tableId);
+    }
 }
index 155815dc8802e70b09015d6590f7957e85a44492..bd9f96c875fe2faa6846eb5f2c4f1b076fa5827d 100644 (file)
@@ -44,7 +44,6 @@ public class StatisticsListener implements OpendaylightGroupStatisticsListener,
 
     private final static Logger sucLogger = LoggerFactory.getLogger(StatisticsListener.class);
     private final StatisticsProvider statisticsManager;
-    private final MultipartMessageManager messageManager;
 
     /**
      * default ctor
@@ -52,56 +51,37 @@ public class StatisticsListener implements OpendaylightGroupStatisticsListener,
      */
     public StatisticsListener(final StatisticsProvider manager){
         this.statisticsManager = manager;
-        this.messageManager = this.statisticsManager.getMultipartMessageManager();
     }
 
     @Override
     public void onMeterConfigStatsUpdated(final MeterConfigStatsUpdated notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
-        //Add statistics to local cache
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateMeterConfigStats(notification.getMeterConfigStats());
+            handler.updateMeterConfigStats(notification, notification.isMoreReplies(), notification.getMeterConfigStats());
         }
     }
 
     @Override
     public void onMeterStatisticsUpdated(MeterStatisticsUpdated notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
-        //Add statistics to local cache
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateMeterStats(notification.getMeterStats());
+            handler.updateMeterStats(notification, notification.isMoreReplies(), notification.getMeterStats());
         }
     }
 
     @Override
     public void onGroupDescStatsUpdated(GroupDescStatsUpdated notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateGroupDescStats(notification.getGroupDescStats());
+            handler.updateGroupDescStats(notification, notification.isMoreReplies(), notification.getGroupDescStats());
         }
     }
 
     @Override
     public void onGroupStatisticsUpdated(GroupStatisticsUpdated notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateGroupStats(notification.getGroupStats());
+            handler.updateGroupStats(notification, notification.isMoreReplies(), notification.getGroupStats());
         }
     }
 
@@ -123,65 +103,42 @@ public class StatisticsListener implements OpendaylightGroupStatisticsListener,
 
     @Override
     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         sucLogger.debug("Received flow stats update : {}",notification.toString());
         final NodeStatisticsHandler sna = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (sna != null) {
-            sna.updateFlowStats(notification.getFlowAndStatisticsMapList());
+            sna.updateFlowStats(notification, notification.isMoreReplies(), notification.getFlowAndStatisticsMapList());
         }
     }
 
     @Override
     public void onAggregateFlowStatisticsUpdate(AggregateFlowStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            final Short tableId = messageManager.getTableIdForTxId(notification.getId(),notification.getTransactionId());
-            handler.updateAggregateFlowStats(tableId, notification);
+            handler.updateAggregateFlowStats(notification, notification.isMoreReplies(), notification);
         }
     }
 
     @Override
     public void onNodeConnectorStatisticsUpdate(NodeConnectorStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateNodeConnectorStats(notification.getNodeConnectorStatisticsAndPortNumberMap());
+            handler.updateNodeConnectorStats(notification, notification.isMoreReplies(), notification.getNodeConnectorStatisticsAndPortNumberMap());
         }
     }
 
     @Override
     public void onFlowTableStatisticsUpdate(FlowTableStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateFlowTableStats(notification.getFlowTableAndStatisticsMap());
+            handler.updateFlowTableStats(notification, notification.isMoreReplies(), notification.getFlowTableAndStatisticsMap());
         }
     }
 
     @Override
     public void onQueueStatisticsUpdate(QueueStatisticsUpdate notification) {
-        //Check if response is for the request statistics-manager sent.
-        if(!messageManager.isRequestTxIdExist(notification.getId(),notification.getTransactionId(),notification.isMoreReplies()))
-            return;
-
-        //Add statistics to local cache
         final NodeStatisticsHandler handler = this.statisticsManager.getStatisticsHandler(notification.getId());
         if (handler != null) {
-            handler.updateQueueStats(notification.getQueueIdAndStatisticsMap());
+            handler.updateQueueStats(notification, notification.isMoreReplies(), notification.getQueueIdAndStatisticsMap());
         }
     }
 }
-
index e9d2356cf9552f560b582747e6f3c66a59452cb3..3ee059d1c0e06f45e7c14a832e088b3dfc906029 100644 (file)
@@ -96,7 +96,6 @@ public class StatisticsProvider implements AutoCloseable {
     private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
 
     private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
-    private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
     private final Timer timer = new Timer("statistics-manager", true);
     private final DataProviderService dps;
 
@@ -118,10 +117,6 @@ public class StatisticsProvider implements AutoCloseable {
         this.dps = Preconditions.checkNotNull(dataService);
     }
 
-    public MultipartMessageManager getMultipartMessageManager() {
-        return multipartMessageManager;
-    }
-
     private final StatisticsListener updateCommiter = new StatisticsListener(StatisticsProvider.this);
 
     private Registration<NotificationListener> listenerRegistration;
@@ -165,7 +160,6 @@ public class StatisticsProvider implements AutoCloseable {
                         nodeStatisticsAger.cleanStaleStatistics();
                     }
 
-                    multipartMessageManager.cleanStaleTransactionIds();
                 } catch (RuntimeException e) {
                     spLogger.warn("Failed to request statistics", e);
                 }
@@ -255,9 +249,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
                 flowTableStatsService.getFlowTablesStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_FLOW_TABLE);
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW_TABLE);
     }
 
     private void sendAllFlowsStatsFromAllTablesRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -269,9 +261,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
                 flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_FLOW);
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
     }
 
     public void sendFlowStatsFromTableRequest(NodeKey node, Flow flow) throws InterruptedException, ExecutionException {
@@ -283,16 +273,14 @@ public class StatisticsProvider implements AutoCloseable {
 
     private void sendFlowStatsFromTableRequest(NodeStatisticsHandler h, Flow flow) throws InterruptedException, ExecutionException{
         final GetFlowStatisticsFromFlowTableInputBuilder input =
-                new GetFlowStatisticsFromFlowTableInputBuilder();
+                new GetFlowStatisticsFromFlowTableInputBuilder(flow);
 
         input.setNode(h.getTargetNodeRef());
-        input.fieldsFrom(flow);
 
         Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
                 flowStatsService.getFlowStatisticsFromFlowTable(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
     }
 
     private void sendAggregateFlowsStatsFromAllTablesRequest(final NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -300,24 +288,22 @@ public class StatisticsProvider implements AutoCloseable {
         spLogger.debug("Node {} supports {} table(s)", h, tables.size());
 
         for (TableKey key : h.getKnownTables()) {
-            sendAggregateFlowsStatsFromTableRequest(h.getTargetNodeKey(), key.getId().shortValue());
+            sendAggregateFlowsStatsFromTableRequest(h, key.getId().shortValue());
         }
     }
 
-    private void sendAggregateFlowsStatsFromTableRequest(NodeKey targetNodeKey,Short tableId) throws InterruptedException, ExecutionException{
+    private void sendAggregateFlowsStatsFromTableRequest(final NodeStatisticsHandler h, Short tableId) throws InterruptedException, ExecutionException{
 
-        spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId,targetNodeKey);
+        spLogger.debug("Send aggregate stats request for flow table {} to node {}",tableId, h.getTargetNodeKey());
         GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
                 new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
 
-        input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
+        input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, h.getTargetNodeKey()).toInstance()));
         input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
         Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
                 flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
 
-        multipartMessageManager.setTxIdAndTableIdMapEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId(), tableId);
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(targetNodeKey.getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.AGGR_FLOW);
+        h.recordExpectedTableTransaction(response.get().getResult().getTransactionId(), tableId);
     }
 
     private void sendAllNodeConnectorsStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -328,9 +314,7 @@ public class StatisticsProvider implements AutoCloseable {
 
         Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
                 portStatsService.getAllNodeConnectorsStatistics(input.build());
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_PORT);
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_PORT);
     }
 
     private void sendAllGroupStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -342,9 +326,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllGroupStatisticsOutput>> response =
                 groupStatsService.getAllGroupStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_GROUP);
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_GROUP);
     }
 
     public void sendGroupDescriptionRequest(NodeKey node) throws InterruptedException, ExecutionException{
@@ -362,8 +344,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetGroupDescriptionOutput>> response =
                 groupStatsService.getGroupDescription(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC);
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC);
     }
 
     private void sendAllMeterStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
@@ -375,9 +356,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllMeterStatisticsOutput>> response =
                 meterStatsService.getAllMeterStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
-                , StatsRequestType.ALL_METER);;
-
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_METER);
     }
 
     public void sendMeterConfigStatisticsRequest(NodeKey node) throws InterruptedException, ExecutionException {
@@ -396,8 +375,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
                 meterStatsService.getAllMeterConfigStatistics(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);;
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);;
     }
 
     private void sendAllQueueStatsFromAllNodeConnector(NodeStatisticsHandler h) throws InterruptedException, ExecutionException {
@@ -408,8 +386,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
                 queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
     }
 
     public void sendQueueStatsFromGivenNodeConnector(NodeKey node,NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
@@ -428,8 +405,7 @@ public class StatisticsProvider implements AutoCloseable {
         Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
                 queueStatsService.getQueueStatisticsFromGivenPort(input.build());
 
-        this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),
-                response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
+        h.recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
     }
 
     /**