Merge "Bug 1073: Implemented Transaction chain on InMemoryDOMDataStore level."
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / NodeStatisticsHandler.java
index 5d5d17230fda1d9efa458deddc3d5fbb44626c86..7d9cc7ecbd789634e96ddbf32153e534d594dbc4 100644 (file)
@@ -7,42 +7,26 @@
  */
 package org.opendaylight.controller.md.statistics.manager;
 
-import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 
-import org.opendaylight.controller.md.statistics.manager.MultipartMessageManager.StatsRequestType;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.queue.rev130925.QueueId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetGroupDescriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.NodeGroupFeaturesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsService;
@@ -50,16 +34,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.desc.stats.reply.GroupDescStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.statistics.reply.GroupStats;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.NodeMeterFeaturesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsService;
@@ -68,18 +47,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.AggregateFlowStatistics;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetAllQueuesStatisticsFromAllPortsOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.GetQueueStatisticsFromGivenPortOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.queue.id.and.statistics.map.QueueIdAndStatisticsMap;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,18 +64,15 @@ import com.google.common.base.Preconditions;
  *
  * @author avishnoi@in.ibm.com
  */
-public final class NodeStatisticsHandler implements AutoCloseable {
+public final class NodeStatisticsHandler implements AutoCloseable, FlowCapableContext {
     private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
-    private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
-    private final OpendaylightFlowStatisticsService flowStatsService;
-    private final OpendaylightFlowTableStatisticsService flowTableStatsService;
-    private final OpendaylightGroupStatisticsService groupStatsService;
-    private final OpendaylightMeterStatisticsService meterStatsService;
-    private final OpendaylightPortStatisticsService portStatsService;
-    private final OpendaylightQueueStatisticsService queueStatsService;
+    private static final long STATS_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(15);
+    private static final long FIRST_COLLECTION_MILLIS = TimeUnit.SECONDS.toMillis(5);
+    private static final int NUMBER_OF_WAIT_CYCLES = 2;
 
-    private final MultipartMessageManager msgManager = new MultipartMessageManager();
+    private final MultipartMessageManager msgManager;
+    private final StatisticsRequestScheduler srScheduler;
     private final InstanceIdentifier<Node> targetNodeIdentifier;
     private final FlowStatsTracker flowStats;
     private final FlowTableStatsTracker flowTableStats;
@@ -116,6 +85,17 @@ public final class NodeStatisticsHandler implements AutoCloseable {
     private final DataProviderService dps;
     private final NodeRef targetNodeRef;
     private final NodeKey targetNodeKey;
+    private final TimerTask task = new TimerTask() {
+        @Override
+        public void run() {
+            try{
+                requestPeriodicStatistics();
+                cleanStaleStatistics();
+            }catch(Exception e){
+                logger.warn("Exception occured while sending statistics request : {}",e);
+            }
+        }
+    };
 
     public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey,
             final OpendaylightFlowStatisticsService flowStatsService,
@@ -123,92 +103,94 @@ public final class NodeStatisticsHandler implements AutoCloseable {
             final OpendaylightGroupStatisticsService groupStatsService,
             final OpendaylightMeterStatisticsService meterStatsService,
             final OpendaylightPortStatisticsService portStatsService,
-            final OpendaylightQueueStatisticsService queueStatsService) {
+            final OpendaylightQueueStatisticsService queueStatsService, 
+            final StatisticsRequestScheduler srScheduler) {
         this.dps = Preconditions.checkNotNull(dps);
         this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
+        this.srScheduler = Preconditions.checkNotNull(srScheduler);
         this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
         this.targetNodeRef = new NodeRef(targetNodeIdentifier);
 
-        this.flowStatsService = flowStatsService;
-        this.flowTableStatsService = flowTableStatsService;
-        this.groupStatsService = groupStatsService;
-        this.meterStatsService = meterStatsService;
-        this.portStatsService = portStatsService;
-        this.queueStatsService = queueStatsService;
-
-        final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(StatisticsProvider.STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
-        flowStats = new FlowStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        flowTableStats = new FlowTableStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        groupDescStats = new GroupDescStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        groupStats = new GroupStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        meterConfigStats = new MeterConfigStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        meterStats = new MeterStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        nodeConnectorStats = new NodeConnectorStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
-        queueStats = new QueueStatsTracker(targetNodeIdentifier, dps, lifetimeNanos);
+        final long lifetimeNanos = TimeUnit.MILLISECONDS.toNanos(STATS_COLLECTION_MILLIS * NUMBER_OF_WAIT_CYCLES);
+
+        msgManager = new MultipartMessageManager(lifetimeNanos);
+        flowTableStats = new FlowTableStatsTracker(flowTableStatsService, this);
+        flowStats = new FlowStatsTracker(flowStatsService, this, flowTableStats);
+        groupDescStats = new GroupDescStatsTracker(groupStatsService, this);
+        groupStats = new GroupStatsTracker(groupStatsService, this);
+        meterConfigStats = new MeterConfigStatsTracker(meterStatsService, this);
+        meterStats = new MeterStatsTracker(meterStatsService, this);
+        nodeConnectorStats = new NodeConnectorStatsTracker(portStatsService, this);
+        queueStats = new QueueStatsTracker(queueStatsService, this);
     }
 
     public NodeKey getTargetNodeKey() {
         return targetNodeKey;
     }
 
-    public Collection<TableKey> getKnownTables() {
-        return flowTableStats.getTables();
-    }
-
-    public InstanceIdentifier<Node> getTargetNodeIdentifier() {
+    @Override
+    public InstanceIdentifier<Node> getNodeIdentifier() {
         return targetNodeIdentifier;
     }
 
-    public NodeRef getTargetNodeRef() {
+    @Override
+    public NodeRef getNodeRef() {
         return targetNodeRef;
     }
 
-    public synchronized void updateGroupDescStats(TransactionAware transaction, Boolean more, List<GroupDescStats> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    @Override
+    public DataModificationTransaction startDataModification() {
+        DataModificationTransaction dmt = dps.beginTransaction();
+        dmt.registerListener(this.srScheduler);
+        return dmt;
+    }
+
+    public synchronized void updateGroupDescStats(TransactionAware transaction, List<GroupDescStats> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             groupDescStats.updateStats(list);
         }
     }
 
-    public synchronized void updateGroupStats(TransactionAware transaction, Boolean more, List<GroupStats> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateGroupStats(TransactionAware transaction, List<GroupStats> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             groupStats.updateStats(list);
         }
     }
 
-    public synchronized void updateMeterConfigStats(TransactionAware transaction, Boolean more, List<MeterConfigStats> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateMeterConfigStats(TransactionAware transaction, List<MeterConfigStats> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             meterConfigStats.updateStats(list);
         }
     }
 
-    public synchronized void updateMeterStats(TransactionAware transaction, Boolean more, List<MeterStats> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateMeterStats(TransactionAware transaction, List<MeterStats> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             meterStats.updateStats(list);
         }
     }
 
-    public synchronized void updateQueueStats(TransactionAware transaction, Boolean more, List<QueueIdAndStatisticsMap> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateQueueStats(TransactionAware transaction, List<QueueIdAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             queueStats.updateStats(list);
         }
     }
 
-    public synchronized void updateFlowTableStats(TransactionAware transaction, Boolean more, List<FlowTableAndStatisticsMap> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateFlowTableStats(TransactionAware transaction, List<FlowTableAndStatisticsMap> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             flowTableStats.updateStats(list);
         }
     }
 
-    public synchronized void updateNodeConnectorStats(TransactionAware transaction, Boolean more, List<NodeConnectorStatisticsAndPortNumberMap> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateNodeConnectorStats(TransactionAware transaction, List<NodeConnectorStatisticsAndPortNumberMap> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             nodeConnectorStats.updateStats(list);
         }
     }
 
-    public synchronized void updateAggregateFlowStats(TransactionAware transaction, Boolean more, AggregateFlowStatistics flowStats) {
-        final Short tableId = msgManager.isExpectedTableTransaction(transaction, more);
+    public synchronized void updateAggregateFlowStats(TransactionAware transaction, AggregateFlowStatistics flowStats) {
+        final Short tableId = msgManager.isExpectedTableTransaction(transaction);
         if (tableId != null) {
-            final DataModificationTransaction trans = dps.beginTransaction();
+            final DataModificationTransaction trans = this.startDataModification();
             InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
                     .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId)).toInstance();
 
@@ -229,14 +211,14 @@ public final class NodeStatisticsHandler implements AutoCloseable {
         }
     }
 
-    public synchronized void updateFlowStats(TransactionAware transaction, Boolean more, List<FlowAndStatisticsMapList> list) {
-        if (msgManager.isExpectedTransaction(transaction, more)) {
+    public synchronized void updateFlowStats(TransactionAware transaction, List<FlowAndStatisticsMapList> list) {
+        if (msgManager.isExpectedTransaction(transaction)) {
             flowStats.updateStats(list);
         }
     }
 
     public synchronized void updateGroupFeatures(GroupFeatures notification) {
-        final DataModificationTransaction trans = dps.beginTransaction();
+        final DataModificationTransaction trans = this.startDataModification();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -254,7 +236,7 @@ public final class NodeStatisticsHandler implements AutoCloseable {
     }
 
     public synchronized void updateMeterFeatures(MeterFeatures features) {
-        final DataModificationTransaction trans = dps.beginTransaction();
+        final DataModificationTransaction trans = this.startDataModification();
 
         final NodeBuilder nodeData = new NodeBuilder();
         nodeData.setKey(targetNodeKey);
@@ -272,16 +254,15 @@ public final class NodeStatisticsHandler implements AutoCloseable {
     }
 
     public synchronized void cleanStaleStatistics() {
-        final DataModificationTransaction trans = dps.beginTransaction();
-        final long now = System.nanoTime();
-
-        flowStats.cleanup(trans, now);
-        groupDescStats.cleanup(trans, now);
-        groupStats.cleanup(trans, now);
-        meterConfigStats.cleanup(trans, now);
-        meterStats.cleanup(trans, now);
-        nodeConnectorStats.cleanup(trans, now);
-        queueStats.cleanup(trans, now);
+        final DataModificationTransaction trans = this.startDataModification();
+
+        flowStats.cleanup(trans);
+        groupDescStats.cleanup(trans);
+        groupStats.cleanup(trans);
+        meterConfigStats.cleanup(trans);
+        meterStats.cleanup(trans);
+        nodeConnectorStats.cleanup(trans);
+        queueStats.cleanup(trans);
         msgManager.cleanStaleTransactionIds();
 
         trans.commit();
@@ -290,176 +271,63 @@ public final class NodeStatisticsHandler implements AutoCloseable {
     public synchronized void requestPeriodicStatistics() {
         logger.debug("Send requests for statistics collection to node : {}", targetNodeKey);
 
-        try{
-            if(flowTableStatsService != null){
-                final GetFlowTablesStatisticsInputBuilder input = new GetFlowTablesStatisticsInputBuilder();
-                input.setNode(targetNodeRef);
-
-                Future<RpcResult<GetFlowTablesStatisticsOutput>> response = flowTableStatsService.getFlowTablesStatistics(input.build());
-                recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW_TABLE);
-            }
-            if(flowStatsService != null){
-                // FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
-                //        comes back -- we do not have any tables anyway.
-                sendAggregateFlowsStatsFromAllTablesRequest();
-
-                sendAllFlowsStatsFromAllTablesRequest();
-            }
-            if(portStatsService != null){
-                sendAllNodeConnectorsStatisticsRequest();
-            }
-            if(groupStatsService != null){
-                sendAllGroupStatisticsRequest();
-                sendGroupDescriptionRequest();
-            }
-            if(meterStatsService != null){
-                sendAllMeterStatisticsRequest();
-                sendMeterConfigStatisticsRequest();
-            }
-            if(queueStatsService != null){
-                sendAllQueueStatsFromAllNodeConnector();
-            }
-        } catch(Exception e) {
-            logger.error("Exception occured while sending statistics requests", e);
-        }
-    }
+        this.srScheduler.addRequestToSchedulerQueue(flowTableStats);
+
+        this.srScheduler.addRequestToSchedulerQueue(flowStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(nodeConnectorStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(groupStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(groupDescStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(meterStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(meterConfigStats);
+        
+        this.srScheduler.addRequestToSchedulerQueue(queueStats);
+    }
+    
+    public synchronized void start(final Timer timer) {
+        flowStats.start(dps);
+        groupDescStats.start(dps);
+        groupStats.start(dps);
+        meterConfigStats.start(dps);
+        meterStats.start(dps);
+        queueStats.start(dps);
+
+        timer.schedule(task, (long) (Math.random() * FIRST_COLLECTION_MILLIS), STATS_COLLECTION_MILLIS);
+
+        logger.debug("Statistics handler for node started with base interval {}ms", STATS_COLLECTION_MILLIS);
 
-    public synchronized void start() {
         requestPeriodicStatistics();
     }
 
     @Override
     public synchronized void close() {
-        // FIXME: cleanup any resources we hold (registrations, etc.)
-        logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
-    }
-
-    synchronized void sendFlowStatsFromTableRequest(Flow flow) throws InterruptedException, ExecutionException{
-        final GetFlowStatisticsFromFlowTableInputBuilder input =
-                new GetFlowStatisticsFromFlowTableInputBuilder(flow);
-
-        input.setNode(targetNodeRef);
-
-        Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> response =
-                flowStatsService.getFlowStatisticsFromFlowTable(input.build());
-
-        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
-    }
+        task.cancel();
+        flowStats.close();
+        groupDescStats.close();
+        groupStats.close();
+        meterConfigStats.close();
+        meterStats.close();
+        queueStats.close();
 
-    synchronized void sendGroupDescriptionRequest() throws InterruptedException, ExecutionException{
-        final GetGroupDescriptionInputBuilder input = new GetGroupDescriptionInputBuilder();
+        //Clean up queued statistics request from scheduler queue 
+        srScheduler.removeRequestsFromSchedulerQueue(this.getNodeRef());
 
-        input.setNode(targetNodeRef);
-
-        Future<RpcResult<GetGroupDescriptionOutput>> response =
-                groupStatsService.getGroupDescription(input.build());
-
-        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.GROUP_DESC);
-    }
-
-    synchronized void sendMeterConfigStatisticsRequest() throws InterruptedException, ExecutionException{
-
-        GetAllMeterConfigStatisticsInputBuilder input = new GetAllMeterConfigStatisticsInputBuilder();
-
-        input.setNode(targetNodeRef);
-
-        Future<RpcResult<GetAllMeterConfigStatisticsOutput>> response =
-                meterStatsService.getAllMeterConfigStatistics(input.build());
-
-        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.METER_CONFIG);
-    }
-
-    synchronized void sendQueueStatsFromGivenNodeConnector(NodeConnectorId nodeConnectorId, QueueId queueId) throws InterruptedException, ExecutionException {
-        GetQueueStatisticsFromGivenPortInputBuilder input = new GetQueueStatisticsFromGivenPortInputBuilder();
-
-        input.setNode(targetNodeRef);
-        input.setNodeConnectorId(nodeConnectorId);
-        input.setQueueId(queueId);
-        Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> response =
-                queueStatsService.getQueueStatisticsFromGivenPort(input.build());
-
-        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);;
-    }
-
-    private void sendAllMeterStatisticsRequest() throws InterruptedException, ExecutionException{
-
-        GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
-
-        input.setNode(targetNodeRef);
-
-        Future<RpcResult<GetAllMeterStatisticsOutput>> response =
-                meterStatsService.getAllMeterStatistics(input.build());
-
-        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_METER);
-    }
-
-    private void sendAllFlowsStatsFromAllTablesRequest() throws InterruptedException, ExecutionException{
-        final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input = new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
-        input.setNode(targetNodeRef);
-
-        Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response = flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
-
-        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_FLOW);
-    }
-
-    private void sendAggregateFlowsStatsFromAllTablesRequest() throws InterruptedException, ExecutionException{
-        final Collection<TableKey> tables = getKnownTables();
-        logger.debug("Node {} supports {} table(s)", targetNodeKey, tables.size());
-
-        for (TableKey key : tables) {
-            sendAggregateFlowsStatsFromTableRequest(key.getId().shortValue());
-        }
-    }
-
-    private void sendAggregateFlowsStatsFromTableRequest(Short tableId) throws InterruptedException, ExecutionException{
-        logger.debug("Send aggregate stats request for flow table {} to node {}",tableId, targetNodeKey);
-        GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder input =
-                new GetAggregateFlowStatisticsFromFlowTableForAllFlowsInputBuilder();
-
-        input.setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).toInstance()));
-        input.setTableId(new org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId(tableId));
-        Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> response =
-                flowStatsService.getAggregateFlowStatisticsFromFlowTableForAllFlows(input.build());
-
-        recordExpectedTableTransaction(response.get().getResult().getTransactionId(), tableId);
-    }
-
-    private void sendAllQueueStatsFromAllNodeConnector() throws InterruptedException, ExecutionException {
-        GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
-
-        input.setNode(targetNodeRef);
-
-        Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
-                queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
-
-        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_QUEUE_STATS);
-    }
-
-    private void sendAllNodeConnectorsStatisticsRequest() throws InterruptedException, ExecutionException{
-        final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
-
-        input.setNode(targetNodeRef);
-
-        Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
-                portStatsService.getAllNodeConnectorsStatistics(input.build());
-        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_PORT);
-    }
-
-    private void sendAllGroupStatisticsRequest() throws InterruptedException, ExecutionException{
-        final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
-        input.setNode(targetNodeRef);
-
-        Future<RpcResult<GetAllGroupStatisticsOutput>> response =
-                groupStatsService.getAllGroupStatistics(input.build());
-
-        recordExpectedTransaction(response.get().getResult().getTransactionId(), StatsRequestType.ALL_GROUP);
+        logger.debug("Statistics handler for {} shut down", targetNodeKey.getId());
     }
 
-    private void recordExpectedTransaction(TransactionId transactionId, StatsRequestType reqType) {
-        msgManager.recordExpectedTransaction(transactionId, reqType);
+    @Override
+    public void registerTransaction(TransactionId id) {
+        msgManager.recordExpectedTransaction(id);
+        logger.debug("Transaction {} for node {} sent successfully", id, targetNodeKey);
     }
 
-    private void recordExpectedTableTransaction(TransactionId transactionId, Short tableId) {
-        msgManager.recordExpectedTableTransaction(transactionId, StatsRequestType.AGGR_FLOW, tableId);
+    @Override
+    public void registerTableTransaction(final TransactionId id, final Short table) {
+        msgManager.recordExpectedTableTransaction(id, table);
+        logger.debug("Transaction {} for node {} table {} sent successfully", id, targetNodeKey, table);
     }
 }